Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,569 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import scala.collection._
+import org.apache.log4j.Logger
+import kafka.cluster._
+import kafka.utils._
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import java.net.InetAddress
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+import kafka.api.OffsetRequest
+
+/**
+ * This class handles the consumers interaction with zookeeper
+ *
+ * Directories:
+ * 1. Consumer id registry:
+ * /consumers/[group_id]/ids[consumer_id] -> topic1,...topicN
+ * A consumer has a unique consumer id within a consumer group. A consumer registers its id as an ephemeral znode
+ * and puts all topics that it subscribes to as the value of the znode. The znode is deleted when the client is gone.
+ * A consumer subscribes to event changes of the consumer id registry within its group.
+ *
+ * The consumer id is picked up from configuration, instead of the sequential id assigned by ZK. Generated sequential
+ * ids are hard to recover during temporary connection loss to ZK, since it's difficult for the client to figure out
+ * whether the creation of a sequential znode has succeeded or not. More details can be found at
+ * (http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling)
+ *
+ * 2. Broker node registry:
+ * /brokers/[0...N] --> { "host" : "host:port",
+ * "topics" : {"topic1": ["partition1" ... "partitionN"], ...,
+ * "topicN": ["partition1" ... "partitionN"] } }
+ * This is a list of all present broker brokers. A unique logical node id is configured on each broker node. A broker
+ * node registers itself on start-up and creates a znode with the logical node id under /brokers. The value of the znode
+ * is a JSON String that contains (1) the host name and the port the broker is listening to, (2) a list of topics that
+ * the broker serves, (3) a list of logical partitions assigned to each topic on the broker.
+ * A consumer subscribes to event changes of the broker node registry.
+ *
+ * 3. Partition owner registry:
+ * /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id
+ * This stores the mapping before broker partitions and consumers. Each partition is owned by a unique consumer
+ * within a consumer group. The mapping is reestablished after each rebalancing.
+ *
+ * 4. Consumer offset tracking:
+ * /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
+ * Each consumer tracks the offset of the latest message consumed for each partition.
+ *
+ */
+private[kafka] object ZookeeperConsumerConnector {
+ val MAX_N_RETRIES = 4
+ val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
+}
+
+/**
+ * JMX interface for monitoring consumer
+ */
+trait ZookeeperConsumerConnectorMBean {
+ def getPartOwnerStats: String
+ def getConsumerGroup: String
+ def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long
+ def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long
+ def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
+}
+
+private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
+ val enableFetcher: Boolean) // for testing only
+ extends ConsumerConnector with ZookeeperConsumerConnectorMBean {
+
+ private val logger = Logger.getLogger(getClass())
+ private val isShuttingDown = new AtomicBoolean(false)
+ private val rebalanceLock = new Object
+ private var fetcher: Option[Fetcher] = None
+ private var zkClient: ZkClient = null
+ private val topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
+ // queues : (topic,consumerThreadId) -> queue
+ private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
+ private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+ connectZk
+ createFetcher
+ if (config.autoCommit) {
+ logger.info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
+ scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs)
+ }
+
+ def this(config: ConsumerConfig) = this(config, true)
+
+ def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]] = {
+ consume(topicCountMap)
+ }
+
+ private def createFetcher() {
+ if (enableFetcher)
+ fetcher = Some(new Fetcher(config, zkClient))
+ }
+
+ private def connectZk() {
+ logger.info("Connecting to zookeeper instance at " + config.zkConnect)
+ zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+ }
+
+ def shutdown() {
+ val canShutdown = isShuttingDown.compareAndSet(false, true);
+ if (canShutdown) {
+ logger.info("ZKConsumerConnector shutting down")
+ try {
+ scheduler.shutdown
+ fetcher match {
+ case Some(f) => f.shutdown
+ case None =>
+ }
+ sendShudownToAllQueues
+ if (zkClient != null) {
+ zkClient.close()
+ zkClient = null
+ }
+ }
+ catch {
+ case e =>
+ logger.fatal(e)
+ logger.fatal(Utils.stackTrace(e))
+ }
+ logger.info("ZKConsumerConnector shut down completed")
+ }
+ }
+
+ def consume(topicCountMap: scala.collection.Map[String,Int]): Map[String,List[KafkaMessageStream]] = {
+ logger.debug("entering consume ")
+ if (topicCountMap == null)
+ throw new RuntimeException("topicCountMap is null")
+
+ val dirs = new ZKGroupDirs(config.groupId)
+ var ret = new mutable.HashMap[String,List[KafkaMessageStream]]
+
+ var consumerUuid : String = null
+ config.consumerId match {
+ case Some(consumerId) // for testing only
+ => consumerUuid = consumerId
+ case None // generate unique consumerId automatically
+ => consumerUuid = InetAddress.getLocalHost.getHostName + "-" + System.currentTimeMillis
+ }
+ val consumerIdString = config.groupId + "_" + consumerUuid
+ val topicCount = new TopicCount(consumerIdString, topicCountMap)
+
+ // listener to consumer and partition changes
+ val loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString)
+ registerConsumerInZK(dirs, consumerIdString, topicCount)
+ zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+
+ // create a queue per topic per consumer thread
+ val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
+ for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) {
+ var streamList: List[KafkaMessageStream] = Nil
+ for (threadId <- threadIdSet) {
+ val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+ queues.put((topic, threadId), stream)
+ streamList ::= new KafkaMessageStream(stream, config.consumerTimeoutMs)
+ }
+ ret += (topic -> streamList)
+ logger.debug("adding topic " + topic + " and stream to map..")
+
+ // register on broker partition path changes
+ val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic
+ ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath)
+ zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
+ }
+
+ // register listener for session expired event
+ zkClient.subscribeStateChanges(
+ new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
+
+ // explicitly trigger load balancing for this consumer
+ loadBalancerListener.syncedRebalance
+ ret
+ }
+
+ private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
+ logger.info("begin registering consumer " + consumerIdString + " in ZK")
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
+ logger.info("end registering consumer " + consumerIdString + " in ZK")
+ }
+
+ private def sendShudownToAllQueues() = {
+ for (queue <- queues.values) {
+ logger.debug("Clearing up queue")
+ queue.clear
+ queue.put(ZookeeperConsumerConnector.shutdownCommand)
+ logger.debug("Cleared queue and sent shutdown command")
+ }
+ }
+
+ def autoCommit() {
+ if(logger.isTraceEnabled)
+ logger.trace("auto committing")
+ try {
+ commitOffsets
+ }
+ catch {
+ case t: Throwable =>
+ // log it and let it go
+ logger.error("exception during autoCommit: ", t)
+ }
+ }
+
+ def commitOffsets() {
+ if (zkClient == null)
+ return
+ for ((topic, infos) <- topicRegistry) {
+ val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+ for (info <- infos.values) {
+ val newOffset = info.getConsumeOffset
+ try {
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
+ newOffset.toString)
+ }
+ catch {
+ case t: Throwable =>
+ // log it and let it go
+ logger.warn("exception during commitOffsets: " + t + Utils.stackTrace(t))
+ }
+ if(logger.isDebugEnabled)
+ logger.debug("Committed offset " + newOffset + " for topic " + info)
+ }
+ }
+ }
+
+ // for JMX
+ def getPartOwnerStats(): String = {
+ val builder = new StringBuilder
+ for ((topic, infos) <- topicRegistry) {
+ builder.append("\n" + topic + ": [")
+ val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+ for(partition <- infos.values) {
+ builder.append("\n {")
+ builder.append{partition.partition.name}
+ builder.append(",fetch offset:" + partition.getFetchOffset)
+ builder.append(",consumer offset:" + partition.getConsumeOffset)
+ builder.append("}")
+ }
+ builder.append("\n ]")
+ }
+ builder.toString
+ }
+
+ // for JMX
+ def getConsumerGroup(): String = config.groupId
+
+ def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long =
+ getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId)
+
+ def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
+ val partition = new Partition(brokerId, partitionId)
+ val partitionInfos = topicRegistry.get(topic)
+ if (partitionInfos != null) {
+ val partitionInfo = partitionInfos.get(partition)
+ if (partitionInfo != null)
+ return partitionInfo.getConsumeOffset
+ }
+
+ //otherwise, try to get it from zookeeper
+ try {
+ val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+ val znode = topicDirs.consumerOffsetDir + "/" + partition.name
+ val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
+ if (offsetString != null)
+ return offsetString.toLong
+ else
+ return -1
+ }
+ catch {
+ case e =>
+ logger.error("error in getConsumedOffset JMX ", e)
+ }
+ return -2
+ }
+
+ def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
+ var simpleConsumer: SimpleConsumer = null
+ var producedOffset: Long = -1L
+ try {
+ val cluster = ZkUtils.getCluster(zkClient)
+ val broker = cluster.getBroker(brokerId)
+ simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
+ ConsumerConfig.SocketBufferSize)
+ val latestOffset = simpleConsumer.getOffsetsBefore(topic, partitionId,
+ OffsetRequest.LatestTime, 1)
+ producedOffset = latestOffset(0)
+ }
+ catch {
+ case e =>
+ logger.error("error in getLatestOffset jmx ", e)
+ }
+ finally {
+ if (simpleConsumer != null)
+ simpleConsumer.close
+ }
+ producedOffset
+ }
+
+ class ZKSessionExpireListenner(val dirs: ZKGroupDirs,
+ val consumerIdString: String,
+ val topicCount: TopicCount,
+ val loadBalancerListener: ZKRebalancerListener)
+ extends IZkStateListener {
+ @throws(classOf[Exception])
+ def handleStateChanged(state: KeeperState) {
+ // do nothing, since zkclient will do reconnect for us.
+ }
+
+ /**
+ * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+ * any ephemeral nodes here.
+ *
+ * @throws Exception
+ * On any error.
+ */
+ @throws(classOf[Exception])
+ def handleNewSession() {
+ /**
+ * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
+ * connection for us. We need to release the ownership of the current consumer and re-register this
+ * consumer in the consumer registry and trigger a rebalance.
+ */
+ logger.info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
+ loadBalancerListener.resetState
+ registerConsumerInZK(dirs, consumerIdString, topicCount)
+ // explicitly trigger load balancing for this consumer
+ loadBalancerListener.syncedRebalance
+
+ // There is no need to resubscribe to child and state changes.
+ // The child change watchers will be set inside rebalance when we read the children list.
+ }
+
+ }
+
+ class ZKRebalancerListener(val group: String, val consumerIdString: String)
+ extends IZkChildListener {
+ private val dirs = new ZKGroupDirs(group)
+ private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
+ private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
+
+ @throws(classOf[Exception])
+ def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+ syncedRebalance
+ }
+
+ private def releasePartitionOwnership() {
+ for ((topic, infos) <- topicRegistry) {
+ val topicDirs = new ZKGroupTopicDirs(group, topic)
+ for(partition <- infos.keys) {
+ val znode = topicDirs.consumerOwnerDir + "/" + partition
+ ZkUtils.deletePath(zkClient, znode)
+ if(logger.isDebugEnabled)
+ logger.debug("Consumer " + consumerIdString + " releasing " + znode)
+ }
+ }
+ }
+
+ private def getConsumersPerTopic(group: String) : mutable.Map[String, List[String]] = {
+ val consumers = ZkUtils.getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
+ val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
+ for (consumer <- consumers) {
+ val topicCount = getTopicCount(consumer)
+ for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
+ for (consumerThreadId <- consumerThreadIdSet)
+ consumersPerTopicMap.get(topic) match {
+ case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
+ case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
+ }
+ }
+ }
+ for ( (topic, consumerList) <- consumersPerTopicMap )
+ consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
+ consumersPerTopicMap
+ }
+
+ private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
+ newPartMap: Map[String,List[String]],
+ oldPartMap: Map[String,List[String]],
+ newConsumerMap: Map[String,List[String]],
+ oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = {
+ var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]()
+ for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap )
+ if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic))
+ relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet)
+ relevantTopicThreadIdsMap
+ }
+
+ private def getTopicCount(consumerId: String) : TopicCount = {
+ val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
+ TopicCount.constructTopicCount(consumerId, topicCountJson)
+ }
+
+ def resetState() {
+ topicRegistry.clear
+ oldConsumersPerTopicMap.clear
+ oldPartitionsPerTopicMap.clear
+ }
+
+ def syncedRebalance() {
+ rebalanceLock synchronized {
+ for (i <- 0 until ZookeeperConsumerConnector.MAX_N_RETRIES) {
+ logger.info("begin rebalancing consumer " + consumerIdString + " try #" + i)
+ var done = false
+ try {
+ done = rebalance
+ }
+ catch {
+ case e =>
+ // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
+ // For example, a ZK node can disappear between the time we get all children and the time we try to get
+ // the value of a child. Just let this go since another rebalance will be triggered.
+ logger.info("exception during rebalance " + e)
+ }
+ logger.info("end rebalancing consumer " + consumerIdString + " try #" + i)
+ if (done)
+ return
+ // release all partitions, reset state and retry
+ releasePartitionOwnership
+ resetState
+ Thread.sleep(config.zkSyncTimeMs)
+ }
+ }
+
+ throw new RuntimeException(consumerIdString + " can't rebalance after " + ZookeeperConsumerConnector.MAX_N_RETRIES +" retires")
+ }
+
+ private def rebalance(): Boolean = {
+ // testing code
+ //if ("group1_consumer1" == consumerIdString) {
+ // logger.info("sleeping " + consumerIdString)
+ // Thread.sleep(20)
+ //}
+
+ val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic
+ val cluster = ZkUtils.getCluster(zkClient)
+ val consumersPerTopicMap = getConsumersPerTopic(group)
+ val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
+ val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
+ if (relevantTopicThreadIdsMap.size <= 0) {
+ logger.info("Consumer " + consumerIdString + " with " + consumersPerTopicMap + " doesn't need to rebalance.")
+ return true
+ }
+
+ logger.info("Committing all offsets")
+ commitOffsets
+
+ logger.info("Releasing partition ownership")
+ releasePartitionOwnership
+
+ val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]]
+ for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
+ topicRegistry.remove(topic)
+ topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
+
+ val topicDirs = new ZKGroupTopicDirs(group, topic)
+ val curConsumers = consumersPerTopicMap.get(topic).get
+ var curPartitions: List[String] = partitionsPerTopicMap.get(topic).get
+
+ val nPartsPerConsumer = curPartitions.size / curConsumers.size
+ val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
+
+ logger.info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions +
+ " for topic " + topic + " with consumers: " + curConsumers)
+
+ for (consumerThreadId <- consumerThreadIdSet) {
+ val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
+ assert(myConsumerPosition >= 0)
+ val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
+ val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
+
+ /**
+ * Range-partition the sorted partitions to consumers for better locality.
+ * The first few consumers pick up an extra partition, if any.
+ */
+ if (nParts <= 0)
+ logger.warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
+ else {
+ for (i <- startPart until startPart + nParts) {
+ val partition = curPartitions(i)
+ logger.info(consumerThreadId + " attempting to claim partition " + partition)
+ if (!processPartition(topicDirs, partition, topic, consumerThreadId))
+ return false
+ }
+ queuesToBeCleared += queues.get((topic, consumerThreadId))
+ }
+ }
+ }
+ updateFetcher(cluster, queuesToBeCleared)
+ oldPartitionsPerTopicMap = partitionsPerTopicMap
+ oldConsumersPerTopicMap = consumersPerTopicMap
+ true
+ }
+
+ private def updateFetcher(cluster: Cluster, queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
+ // update partitions for fetcher
+ var allPartitionInfos : List[PartitionTopicInfo] = Nil
+ for (partitionInfos <- topicRegistry.values)
+ for (partition <- partitionInfos.values)
+ allPartitionInfos ::= partition
+ logger.info("Consumer " + consumerIdString + " selected partitions : " +
+ allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(","))
+
+ fetcher match {
+ case Some(f) => f.initConnections(allPartitionInfos, cluster, queuesTobeCleared)
+ case None =>
+ }
+ }
+
+ private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String,
+ topic: String, consumerThreadId: String) : Boolean = {
+ val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
+ try {
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
+ }
+ catch {
+ case e: ZkNodeExistsException =>
+ // The node hasn't been deleted by the original owner. So wait a bit and retry.
+ logger.info("waiting for the partition ownership to be deleted: " + partition)
+ return false
+ case e2 => throw e2
+ }
+ addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
+ true
+ }
+
+ private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String,
+ topic: String, consumerThreadId: String) {
+ val partition = Partition.parse(partitionString)
+ val partTopicInfoMap = topicRegistry.get(topic)
+
+ val znode = topicDirs.consumerOffsetDir + "/" + partition.name
+ val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
+ // If first time starting a consumer, use default offset.
+ // TODO: handle this better (if client doesn't know initial offsets)
+ val offset : Long = if (offsetString == null) Long.MaxValue else offsetString.toLong
+ val queue = queues.get((topic, consumerThreadId))
+ val consumedOffset = new AtomicLong(offset)
+ val fetchedOffset = new AtomicLong(offset)
+ val partTopicInfo = new PartitionTopicInfo(topic,
+ partition.brokerId,
+ partition,
+ queue,
+ consumedOffset,
+ fetchedOffset,
+ new AtomicInteger(config.fetchSize))
+ partTopicInfoMap.put(partition, partTopicInfo)
+ if (logger.isDebugEnabled)
+ logger.debug(partTopicInfo + " selected new offset " + offset)
+ }
+ }
+}
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html Mon Aug 1 23:41:24 2011
@@ -0,0 +1 @@
+This is the consumer API for kafka.
\ No newline at end of file
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer.storage
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import java.util.concurrent.locks._
+
+class MemoryOffsetStorage extends OffsetStorage {
+
+ val offsetAndLock = new ConcurrentHashMap[(Int, String), (AtomicLong, Lock)]
+
+ def reserve(node: Int, topic: String): Long = {
+ val key = (node, topic)
+ if(!offsetAndLock.containsKey(key))
+ offsetAndLock.putIfAbsent(key, (new AtomicLong(0), new ReentrantLock))
+ val (offset, lock) = offsetAndLock.get(key)
+ lock.lock
+ offset.get
+ }
+
+ def commit(node: Int, topic: String, offset: Long) = {
+ val (highwater, lock) = offsetAndLock.get((node, topic))
+ highwater.set(offset)
+ lock.unlock
+ offset
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer.storage
+
+import kafka.utils.Range
+
+/**
+ * A method for storing offsets for the consumer.
+ * This is used to track the progress of the consumer in the stream.
+ */
+trait OffsetStorage {
+
+ /**
+ * Reserve a range of the length given by increment.
+ * @param increment The size to reserver
+ * @return The range reserved
+ */
+ def reserve(node: Int, topic: String): Long
+
+ /**
+ * Update the offset to the new offset
+ * @param offset The new offset
+ */
+ def commit(node: Int, topic: String, offset: Long)
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer.storage.sql
+
+import java.sql._
+import org.apache.log4j._
+import kafka.utils._
+import kafka.consumer.storage.OffsetStorage
+
+/**
+ * An offset storage implementation that uses an oracle database to save offsets
+ */
+@nonthreadsafe
+class OracleOffsetStorage(val connection: Connection) extends OffsetStorage {
+
+ private val logger: Logger = Logger.getLogger(classOf[OracleOffsetStorage])
+ private val lock = new Object
+ connection.setAutoCommit(false)
+
+ def reserve(node: Int, topic: String): Long = {
+ /* try to get and lock the offset, if it isn't there, make it */
+ val maybeOffset = selectExistingOffset(connection, node, topic)
+ val offset = maybeOffset match {
+ case Some(offset) => offset
+ case None => {
+ maybeInsertZeroOffset(connection, node, topic)
+ selectExistingOffset(connection, node, topic).get
+ }
+ }
+
+ if(logger.isDebugEnabled)
+ logger.debug("Reserved node " + node + " for topic '" + topic + " offset " + offset)
+
+ offset
+ }
+
+ def commit(node: Int, topic: String, offset: Long) {
+ var success = false
+ try {
+ updateOffset(connection, node, topic, offset)
+ success = true
+ } finally {
+ commitOrRollback(connection, success)
+ }
+ if(logger.isDebugEnabled)
+ logger.debug("Updated node " + node + " for topic '" + topic + "' to " + offset)
+ }
+
+ def close() {
+ Utils.swallow(logger.error, connection.close())
+ }
+
+ /**
+ * Attempt to update an existing entry in the table if there isn't already one there
+ * @return true iff the row didn't already exist
+ */
+ private def maybeInsertZeroOffset(connection: Connection, node: Int, topic: String): Boolean = {
+ val stmt = connection.prepareStatement(
+ """insert into kafka_offsets (node, topic, offset)
+ select ?, ?, 0 from dual where not exists
+ (select null from kafka_offsets where node = ? and topic = ?)""")
+ stmt.setInt(1, node)
+ stmt.setString(2, topic)
+ stmt.setInt(3, node)
+ stmt.setString(4, topic)
+ val updated = stmt.executeUpdate()
+ if(updated > 1)
+ throw new IllegalStateException("More than one key updated by primary key!")
+ else
+ updated == 1
+ }
+
+ /**
+ * Attempt to update an existing entry in the table
+ * @return true iff we updated an entry
+ */
+ private def selectExistingOffset(connection: Connection, node: Int, topic: String): Option[Long] = {
+ val stmt = connection.prepareStatement(
+ """select offset from kafka_offsets
+ where node = ? and topic = ?
+ for update""")
+ var results: ResultSet = null
+ try {
+ stmt.setInt(1, node)
+ stmt.setString(2, topic)
+ results = stmt.executeQuery()
+ if(!results.next()) {
+ None
+ } else {
+ val offset = results.getLong("offset")
+ if(results.next())
+ throw new IllegalStateException("More than one entry for primary key!")
+ Some(offset)
+ }
+ } finally {
+ close(stmt)
+ close(results)
+ }
+ }
+
+ private def updateOffset(connection: Connection,
+ node: Int,
+ topic: String,
+ newOffset: Long): Unit = {
+ val stmt = connection.prepareStatement("update kafka_offsets set offset = ? where node = ? and topic = ?")
+ try {
+ stmt.setLong(1, newOffset)
+ stmt.setInt(2, node)
+ stmt.setString(3, topic)
+ val updated = stmt.executeUpdate()
+ if(updated != 1)
+ throw new IllegalStateException("Unexpected number of keys updated: " + updated)
+ } finally {
+ close(stmt)
+ }
+ }
+
+
+ private def commitOrRollback(connection: Connection, commit: Boolean) {
+ if(connection != null) {
+ if(commit)
+ Utils.swallow(logger.error, connection.commit())
+ else
+ Utils.swallow(logger.error, connection.rollback())
+ }
+ }
+
+ private def close(rs: ResultSet) {
+ if(rs != null)
+ Utils.swallow(logger.error, rs.close())
+ }
+
+ private def close(stmt: PreparedStatement) {
+ if(stmt != null)
+ Utils.swallow(logger.error, stmt.close())
+ }
+
+ private def close(connection: Connection) {
+ if(connection != null)
+ Utils.swallow(logger.error, connection.close())
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.javaapi
+
+import java.nio.ByteBuffer
+import org.apache.log4j.Logger
+import kafka.serializer.Encoder
+import kafka.producer.{ProducerConfig, ProducerPool}
+import kafka.producer.async.{AsyncProducerConfig, QueueItem}
+
+private[javaapi] object Implicits {
+ private val logger = Logger.getLogger(getClass())
+
+ implicit def javaMessageSetToScalaMessageSet(messageSet: kafka.javaapi.message.ByteBufferMessageSet):
+ kafka.message.ByteBufferMessageSet = messageSet.underlying
+
+ implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
+ kafka.javaapi.message.ByteBufferMessageSet = {
+ new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
+ messageSet.getErrorCode)
+ }
+
+ implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
+ if(logger.isDebugEnabled)
+ logger.debug("Implicit instantiation of Java Sync Producer")
+ new kafka.javaapi.producer.SyncProducer(producer)
+ }
+
+ implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = {
+ if(logger.isDebugEnabled)
+ logger.debug("Implicit instantiation of Sync Producer")
+ producer.underlying
+ }
+
+ implicit def toScalaEventHandler[T](eventHandler: kafka.javaapi.producer.async.EventHandler[T])
+ : kafka.producer.async.EventHandler[T] = {
+ new kafka.producer.async.EventHandler[T] {
+ override def init(props: java.util.Properties) { eventHandler.init(props) }
+ override def handle(events: Seq[QueueItem[T]], producer: kafka.producer.SyncProducer, encoder: Encoder[T]) {
+ import collection.JavaConversions._
+ eventHandler.handle(asList(events), producer, encoder)
+ }
+ override def close { eventHandler.close }
+ }
+ }
+
+ implicit def toJavaEventHandler[T](eventHandler: kafka.producer.async.EventHandler[T])
+ : kafka.javaapi.producer.async.EventHandler[T] = {
+ new kafka.javaapi.producer.async.EventHandler[T] {
+ override def init(props: java.util.Properties) { eventHandler.init(props) }
+ override def handle(events: java.util.List[QueueItem[T]], producer: kafka.javaapi.producer.SyncProducer,
+ encoder: Encoder[T]) {
+ import collection.JavaConversions._
+ eventHandler.handle(asBuffer(events), producer, encoder)
+ }
+ override def close { eventHandler.close }
+ }
+ }
+
+ implicit def toScalaCbkHandler[T](cbkHandler: kafka.javaapi.producer.async.CallbackHandler[T])
+ : kafka.producer.async.CallbackHandler[T] = {
+ new kafka.producer.async.CallbackHandler[T] {
+ import collection.JavaConversions._
+ override def init(props: java.util.Properties) { cbkHandler.init(props)}
+ override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
+ cbkHandler.beforeEnqueue(data)
+ }
+ override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) {
+ cbkHandler.afterEnqueue(data, added)
+ }
+ override def afterDequeuingExistingData(data: QueueItem[T] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
+ cbkHandler.afterDequeuingExistingData(data)
+ }
+ override def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
+ asList(cbkHandler.beforeSendingData(asList(data)))
+ }
+ override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]] = {
+ asBuffer(cbkHandler.lastBatchBeforeClose)
+ }
+ override def close { cbkHandler.close }
+ }
+ }
+
+ implicit def toJavaCbkHandler[T](cbkHandler: kafka.producer.async.CallbackHandler[T])
+ : kafka.javaapi.producer.async.CallbackHandler[T] = {
+ new kafka.javaapi.producer.async.CallbackHandler[T] {
+ import collection.JavaConversions._
+ override def init(props: java.util.Properties) { cbkHandler.init(props)}
+ override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
+ cbkHandler.beforeEnqueue(data)
+ }
+ override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) {
+ cbkHandler.afterEnqueue(data, added)
+ }
+ override def afterDequeuingExistingData(data: QueueItem[T] = null)
+ : java.util.List[QueueItem[T]] = {
+ asList(cbkHandler.afterDequeuingExistingData(data))
+ }
+ override def beforeSendingData(data: java.util.List[QueueItem[T]] = null)
+ : java.util.List[QueueItem[T]] = {
+ asBuffer(cbkHandler.beforeSendingData(asBuffer(data)))
+ }
+ override def lastBatchBeforeClose: java.util.List[QueueItem[T]] = {
+ asList(cbkHandler.lastBatchBeforeClose)
+ }
+ override def close { cbkHandler.close }
+ }
+ }
+
+ implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse =
+ response.underlying
+
+ implicit def toJavaMultiFetchResponse(response: kafka.api.MultiFetchResponse): kafka.javaapi.MultiFetchResponse =
+ new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets, response.offsets)
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi
+
+import kafka.utils.IteratorTemplate
+import java.nio.ByteBuffer
+import message.ByteBufferMessageSet
+
+class MultiFetchResponse(buffer: ByteBuffer, numSets: Int, offsets: Array[Long]) extends java.lang.Iterable[ByteBufferMessageSet] {
+ val underlyingBuffer = ByteBuffer.wrap(buffer.array)
+ // this has the side effect of setting the initial position of buffer correctly
+ val errorCode = underlyingBuffer.getShort
+
+ import Implicits._
+ val underlying = new kafka.api.MultiFetchResponse(underlyingBuffer, numSets, offsets)
+
+ override def toString() = underlying.toString
+
+ def iterator : java.util.Iterator[ByteBufferMessageSet] = {
+ new IteratorTemplate[ByteBufferMessageSet] {
+ val iter = underlying.iterator
+ override def makeNext(): ByteBufferMessageSet = {
+ if(iter.hasNext)
+ iter.next
+ else
+ return allDone
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+import kafka.network.Request
+import kafka.api.RequestKeys
+import java.nio.ByteBuffer
+
+class ProducerRequest(val topic: String,
+ val partition: Int,
+ val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
+ import Implicits._
+ private val underlying = new kafka.api.ProducerRequest(topic, partition, messages)
+
+ def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
+
+ def sizeInBytes(): Int = underlying.sizeInBytes
+
+ def getTranslatedPartition(randomSelector: String => Int): Int =
+ underlying.getTranslatedPartition(randomSelector)
+
+ override def toString: String =
+ underlying.toString
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case that: ProducerRequest =>
+ (that canEqual this) && topic == that.topic && partition == that.partition &&
+ messages.equals(that.messages)
+ case _ => false
+ }
+ }
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
+
+ override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi.consumer;
+
+import kafka.consumer.KafkaMessageStream;
+
+import java.util.List;
+import java.util.Map;
+
+public interface ConsumerConnector {
+ /**
+ * Create a list of MessageStreams for each topic.
+ *
+ * @param topicCountMap a map of (topic, #streams) pair
+ * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the
+ * list is #streams. Each KafkaMessageStream supports an iterator of messages.
+ */
+ public Map<String, List<KafkaMessageStream>> createMessageStreams(Map<String, Integer> topicCountMap);
+
+ /**
+ * Commit the offsets of all broker partitions connected by this connector.
+ */
+ public void commitOffsets();
+
+ /**
+ * Shut down the connector
+ */
+ public void shutdown();
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi.consumer
+
+import kafka.utils.threadsafe
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.javaapi.MultiFetchResponse
+import kafka.api.FetchRequest
+
+/**
+ * A consumer of kafka messages
+ */
+@threadsafe
+class SimpleConsumer(val host: String,
+ val port: Int,
+ val soTimeout: Int,
+ val bufferSize: Int) {
+ val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
+
+ /**
+ * Fetch a set of messages from a topic.
+ *
+ * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+ * @return a set of fetched messages
+ */
+ def fetch(request: FetchRequest): ByteBufferMessageSet = {
+ import kafka.javaapi.Implicits._
+ underlying.fetch(request)
+ }
+
+ /**
+ * Combine multiple fetch requests in one call.
+ *
+ * @param fetches a sequence of fetch requests.
+ * @return a sequence of fetch responses
+ */
+ def multifetch(fetches: java.util.List[FetchRequest]): MultiFetchResponse = {
+ import scala.collection.JavaConversions._
+ import kafka.javaapi.Implicits._
+ underlying.multifetch(asBuffer(fetches): _*)
+ }
+
+ /**
+ * Get a list of valid offsets (up to maxSize) before the given time.
+ * The result is a list of offsets, in descending order.
+ *
+ * @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available)
+ * @return an array of offsets
+ */
+ def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] =
+ underlying.getOffsetsBefore(topic, partition, time, maxNumOffsets)
+
+ def close() {
+ underlying.close
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi.consumer
+
+import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
+
+/**
+ * This class handles the consumers interaction with zookeeper
+ *
+ * Directories:
+ * 1. Consumer id registry:
+ * /consumers/[group_id]/ids[consumer_id] -> topic1,...topicN
+ * A consumer has a unique consumer id within a consumer group. A consumer registers its id as an ephemeral znode
+ * and puts all topics that it subscribes to as the value of the znode. The znode is deleted when the client is gone.
+ * A consumer subscribes to event changes of the consumer id registry within its group.
+ *
+ * The consumer id is picked up from configuration, instead of the sequential id assigned by ZK. Generated sequential
+ * ids are hard to recover during temporary connection loss to ZK, since it's difficult for the client to figure out
+ * whether the creation of a sequential znode has succeeded or not. More details can be found at
+ * (http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling)
+ *
+ * 2. Broker node registry:
+ * /brokers/[0...N] --> { "host" : "host:port",
+ * "topics" : {"topic1": ["partition1" ... "partitionN"], ...,
+ * "topicN": ["partition1" ... "partitionN"] } }
+ * This is a list of all present broker brokers. A unique logical node id is configured on each broker node. A broker
+ * node registers itself on start-up and creates a znode with the logical node id under /brokers. The value of the znode
+ * is a JSON String that contains (1) the host name and the port the broker is listening to, (2) a list of topics that
+ * the broker serves, (3) a list of logical partitions assigned to each topic on the broker.
+ * A consumer subscribes to event changes of the broker node registry.
+ *
+ * 3. Partition owner registry:
+ * /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id
+ * This stores the mapping before broker partitions and consumers. Each partition is owned by a unique consumer
+ * within a consumer group. The mapping is reestablished after each rebalancing.
+ *
+ * 4. Consumer offset tracking:
+ * /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
+ * Each consumer tracks the offset of the latest message consumed for each partition.
+ *
+*/
+
+private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
+ val enableFetcher: Boolean) // for testing only
+ extends ConsumerConnector {
+
+ val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
+
+ def this(config: ConsumerConfig) = this(config, true)
+
+ // for java client
+ def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]):
+ java.util.Map[String,java.util.List[KafkaMessageStream]] = {
+ import scala.collection.JavaConversions._
+
+ val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
+ val scalaReturn = underlying.consume(scalaTopicCountMap)
+ val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream]]
+ for ((topic, streams) <- scalaReturn) {
+ var javaStreamList = new java.util.ArrayList[KafkaMessageStream]
+ for (stream <- streams)
+ javaStreamList.add(stream)
+ ret.put(topic, javaStreamList)
+ }
+ ret
+ }
+
+ def commitOffsets() {
+ underlying.commitOffsets
+ }
+
+ def shutdown() {
+ underlying.shutdown
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.javaapi.message
+
+import java.nio.ByteBuffer
+import kafka.common.ErrorMapping
+import org.apache.log4j.Logger
+import kafka.message._
+
+class ByteBufferMessageSet(private val buffer: ByteBuffer,
+ private val initialOffset: Long = 0L,
+ private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
+ private val logger = Logger.getLogger(getClass())
+ val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
+ initialOffset,
+ errorCode)
+ def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
+
+ def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
+ this(compressionCodec match {
+ case NoCompressionCodec =>
+ val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+ val messageIterator = messages.iterator
+ while(messageIterator.hasNext) {
+ val message = messageIterator.next
+ message.serializeTo(buffer)
+ }
+ buffer.rewind
+ buffer
+ case _ =>
+ import scala.collection.JavaConversions._
+ val message = CompressionUtils.compress(asBuffer(messages), compressionCodec)
+ val buffer = ByteBuffer.allocate(message.serializedSize)
+ message.serializeTo(buffer)
+ buffer.rewind
+ buffer
+ }, 0L, ErrorMapping.NoError)
+ }
+
+ def this(messages: java.util.List[Message]) {
+ this(NoCompressionCodec, messages)
+ }
+
+ def validBytes: Long = underlying.validBytes
+
+ def serialized():ByteBuffer = underlying.serialized
+
+ def getInitialOffset = initialOffset
+
+ def getBuffer = buffer
+
+ def getErrorCode = errorCode
+
+ override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
+ val underlyingIterator = underlying.iterator
+ override def hasNext(): Boolean = {
+ underlyingIterator.hasNext
+ }
+
+ override def next(): MessageAndOffset = {
+ underlyingIterator.next
+ }
+
+ override def remove = throw new UnsupportedOperationException("remove API on MessageSet is not supported")
+ }
+
+ override def toString: String = underlying.toString
+
+ def sizeInBytes: Long = underlying.sizeInBytes
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case that: ByteBufferMessageSet =>
+ (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+ case _ => false
+ }
+ }
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
+
+ override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + initialOffset.hashCode
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi.message
+
+import java.nio.channels.WritableByteChannel
+import kafka.message.{MessageAndOffset, InvalidMessageException, Message}
+
+/**
+ * A set of messages. A message set has a fixed serialized form, though the container
+ * for the bytes could be either in-memory or on disk. A The format of each message is
+ * as follows:
+ * 4 byte size containing an integer N
+ * N message bytes as described in the message class
+ */
+abstract class MessageSet extends java.lang.Iterable[MessageAndOffset] {
+
+ /**
+ * Provides an iterator over the messages in this set
+ */
+ def iterator: java.util.Iterator[MessageAndOffset]
+
+ /**
+ * Gives the total size of this message set in bytes
+ */
+ def sizeInBytes: Long
+
+ /**
+ * Validate the checksum of all the messages in the set. Throws an InvalidMessageException if the checksum doesn't
+ * match the payload for any message.
+ */
+ def validate(): Unit = {
+ val thisIterator = this.iterator
+ while(thisIterator.hasNext) {
+ val messageAndOffset = thisIterator.next
+ if(!messageAndOffset.message.isValid)
+ throw new InvalidMessageException
+ }
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi.producer
+
+import kafka.utils.Utils
+import kafka.producer.async.QueueItem
+import java.util.Properties
+import kafka.producer.{ProducerPool, ProducerConfig, Partitioner}
+import kafka.serializer.Encoder
+
+class Producer[K,V](config: ProducerConfig,
+ partitioner: Partitioner[K],
+ producerPool: ProducerPool[V],
+ populateProducerPool: Boolean = true) /* for testing purpose only. Applications should ideally */
+ /* use the other constructor*/
+{
+
+ private val underlying = new kafka.producer.Producer[K,V](config, partitioner, producerPool, populateProducerPool, null)
+
+ /**
+ * This constructor can be used when all config parameters will be specified through the
+ * ProducerConfig object
+ * @param config Producer Configuration object
+ */
+ def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass),
+ new ProducerPool[V](config, Utils.getObject(config.serializerClass)))
+
+ /**
+ * This constructor can be used to provide pre-instantiated objects for all config parameters
+ * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and
+ * callback handler
+ * @param config Producer Configuration object
+ * @param encoder Encoder used to convert an object of type V to a kafka.message.Message
+ * @param eventHandler the class that implements kafka.javaapi.producer.async.IEventHandler[T] used to
+ * dispatch a batch of produce requests, using an instance of kafka.javaapi.producer.SyncProducer
+ * @param cbkHandler the class that implements kafka.javaapi.producer.async.CallbackHandler[T] used to inject
+ * callbacks at various stages of the kafka.javaapi.producer.AsyncProducer pipeline.
+ * @param partitioner class that implements the kafka.javaapi.producer.Partitioner[K], used to supply a custom
+ * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
+ * object in the send API
+ */
+ import kafka.javaapi.Implicits._
+ def this(config: ProducerConfig,
+ encoder: Encoder[V],
+ eventHandler: kafka.javaapi.producer.async.EventHandler[V],
+ cbkHandler: kafka.javaapi.producer.async.CallbackHandler[V],
+ partitioner: Partitioner[K]) = {
+ this(config, partitioner,
+ new ProducerPool[V](config, encoder,
+ new kafka.producer.async.EventHandler[V] {
+ override def init(props: Properties) { eventHandler.init(props) }
+ override def handle(events: Seq[QueueItem[V]], producer: kafka.producer.SyncProducer,
+ encoder: Encoder[V]) {
+ import collection.JavaConversions._
+ import kafka.javaapi.Implicits._
+ eventHandler.handle(asList(events), producer, encoder)
+ }
+ override def close { eventHandler.close }
+ },
+ new kafka.producer.async.CallbackHandler[V] {
+ import collection.JavaConversions._
+ override def init(props: Properties) { cbkHandler.init(props)}
+ override def beforeEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]]): QueueItem[V] = {
+ cbkHandler.beforeEnqueue(data)
+ }
+ override def afterEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]], added: Boolean) {
+ cbkHandler.afterEnqueue(data, added)
+ }
+ override def afterDequeuingExistingData(data: QueueItem[V] = null): scala.collection.mutable.Seq[QueueItem[V]] = {
+ cbkHandler.afterDequeuingExistingData(data)
+ }
+ override def beforeSendingData(data: Seq[QueueItem[V]] = null): scala.collection.mutable.Seq[QueueItem[V]] = {
+ asList(cbkHandler.beforeSendingData(asList(data)))
+ }
+ override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[V]] = {
+ asBuffer(cbkHandler.lastBatchBeforeClose)
+ }
+ override def close { cbkHandler.close }
+ }))
+ }
+
+ /**
+ * Sends the data to a single topic, partitioned by key, using either the
+ * synchronous or the asynchronous producer
+ * @param producerData the producer data object that encapsulates the topic, key and message data
+ */
+ def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) {
+ import collection.JavaConversions._
+ underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey,
+ asBuffer(producerData.getData)))
+ }
+
+ /**
+ * Use this API to send data to multiple topics
+ * @param producerData list of producer data objects that encapsulate the topic, key and message data
+ */
+ def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) {
+ import collection.JavaConversions._
+ underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
+ asBuffer(pd.getData))): _*)
+ }
+
+ /**
+ * Close API to close the producer pool connections to all Kafka brokers. Also closes
+ * the zookeeper client connection if one exists
+ */
+ def close = underlying.close
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.javaapi.producer
+
+import scala.collection.JavaConversions._
+
+class ProducerData[K, V](private val topic: String,
+ private val key: K,
+ private val data: java.util.List[V]) {
+
+ def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
+
+ def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d)))
+
+ def getTopic: String = topic
+
+ def getKey: K = key
+
+ def getData: java.util.List[V] = data
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi.producer
+
+import kafka.producer.SyncProducerConfig
+import kafka.javaapi.message.ByteBufferMessageSet
+
+class SyncProducer(syncProducer: kafka.producer.SyncProducer) {
+
+ def this(config: SyncProducerConfig) = this(new kafka.producer.SyncProducer(config))
+
+ val underlying = syncProducer
+
+ def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
+ import kafka.javaapi.Implicits._
+ underlying.send(topic, partition, messages)
+ }
+
+ def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic,
+ kafka.api.ProducerRequest.RandomPartition,
+ messages)
+
+ def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) {
+ import kafka.javaapi.Implicits._
+ val produceRequests = new Array[kafka.api.ProducerRequest](produces.length)
+ for(i <- 0 until produces.length)
+ produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages)
+ underlying.multiSend(produceRequests)
+ }
+
+ def close() {
+ underlying.close
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.javaapi.producer.async;
+
+import kafka.producer.async.QueueItem;
+
+import java.util.Properties;
+
+/**
+ * Callback handler APIs for use in the async producer. The purpose is to
+ * give the user some callback handles to insert custom functionality at
+ * various stages as the data flows through the pipeline of the async producer
+ */
+public interface CallbackHandler<T> {
+ /**
+ * Initializes the callback handler using a Properties object
+ * @param props the properties used to initialize the callback handler
+ */
+ public void init(Properties props);
+
+ /**
+ * Callback to process the data before it enters the batching queue
+ * of the asynchronous producer
+ * @param data the data sent to the producer
+ * @return the processed data that enters the queue
+ */
+ public QueueItem<T> beforeEnqueue(QueueItem<T> data);
+
+ /**
+ * Callback to process the data just after it enters the batching queue
+ * of the asynchronous producer
+ * @param data the data sent to the producer
+ * @param added flag that indicates if the data was successfully added to the queue
+ */
+ public void afterEnqueue(QueueItem<T> data, boolean added);
+
+ /**
+ * Callback to process the data item right after it has been dequeued by the
+ * background sender thread of the asynchronous producer
+ * @param data the data item dequeued from the async producer queue
+ * @return the processed list of data items that gets added to the data handled by the event handler
+ */
+ public java.util.List<QueueItem<T>> afterDequeuingExistingData(QueueItem<T> data);
+
+ /**
+ * Callback to process the batched data right before it is being processed by the
+ * handle API of the event handler
+ * @param data the batched data received by the event handler
+ * @return the processed batched data that gets processed by the handle() API of the event handler
+ */
+ public java.util.List<QueueItem<T>> beforeSendingData(java.util.List<QueueItem<T>> data);
+
+ /**
+ * Callback to process the last batch of data right before the producer send thread is shutdown
+ * @return the last batch of data that is sent to the EventHandler
+ */
+ public java.util.List<QueueItem<T>> lastBatchBeforeClose();
+
+ /**
+ * Cleans up and shuts down the callback handler
+ */
+ public void close();
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.javaapi.producer.async;
+
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.async.QueueItem;
+import kafka.serializer.Encoder;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Handler that dispatches the batched data from the queue of the
+ * asynchronous producer.
+ */
+public interface EventHandler<T> {
+ /**
+ * Initializes the event handler using a Properties object
+ * @param props the properties used to initialize the event handler
+ */
+ public void init(Properties props);
+
+ /**
+ * Callback to dispatch the batched data and send it to a Kafka server
+ * @param events the data sent to the producer
+ * @param producer the low-level producer used to send the data
+ */
+ public void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder);
+
+ /**
+ * Cleans up and shuts down the event handler
+ */
+ public void close();
+}
|