kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-1764; ZookeeperConsumerConnector should not put multiple shutdown commands to the same data chunk queue; reviewed by Joel Koshy and Guozhang Wang
Date Fri, 14 Nov 2014 02:10:53 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2cd9ae745 -> d99af88ea


KAFKA-1764; ZookeeperConsumerConnector should not put multiple shutdown commands to the same
data chunk queue; reviewed by Joel Koshy and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d99af88e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d99af88e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d99af88e

Branch: refs/heads/trunk
Commit: d99af88eafefbfd1c537a64f8107cd9041346015
Parents: 2cd9ae7
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Thu Nov 13 18:10:35 2014 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Nov 13 18:10:35 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/consumer/ConsumerIterator.scala          | 1 -
 .../src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 2 +-
 2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d99af88e/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index ac491b4..78fbf75 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -71,7 +71,6 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
       }
       if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
         debug("Received the shutdown command")
-        channel.offer(currentDataChunk)
         return allDone
       } else {
         currentTopicInfo = currentDataChunk.topicInfo

http://git-wip-us.apache.org/repos/asf/kafka/blob/d99af88e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fbc680f..2402b45 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -256,7 +256,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
 
   private def sendShutdownToAllQueues() = {
-    for (queue <- topicThreadIdAndQueues.values) {
+    for (queue <- topicThreadIdAndQueues.values.toSet) {
       debug("Clearing up queue")
       queue.clear()
       queue.put(ZookeeperConsumerConnector.shutdownCommand)


Mime
View raw message