kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-1764; ZookeeperConsumerConnector should not put multiple shutdown commands to the same data chunk queue; reviewed by Joel Koshy and Guozhang Wang
Date Fri, 14 Nov 2014 18:05:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8c35030e4 -> ca2cf97a6


KAFKA-1764; ZookeeperConsumerConnector should not put multiple shutdown commands to the same
data chunk queue; reviewed by Joel Koshy and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ca2cf97a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ca2cf97a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ca2cf97a

Branch: refs/heads/trunk
Commit: ca2cf97a6b151566f091a5dd016b93dfdaf87628
Parents: 8c35030
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Nov 14 10:02:45 2014 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Nov 14 10:02:45 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/consumer/ConsumerIterator.scala          | 1 -
 .../src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 2 +-
 core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala | 2 +-
 3 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ca2cf97a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index ac491b4..78fbf75 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -71,7 +71,6 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
       }
       if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
         debug("Received the shutdown command")
-        channel.offer(currentDataChunk)
         return allDone
       } else {
         currentTopicInfo = currentDataChunk.topicInfo

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca2cf97a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fbc680f..f476973 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -256,7 +256,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
 
   private def sendShutdownToAllQueues() = {
-    for (queue <- topicThreadIdAndQueues.values) {
+    for (queue <- topicThreadIdAndQueues.values.toSet[BlockingQueue[FetchedDataChunk]])
{
       debug("Clearing up queue")
       queue.clear()
       queue.put(ZookeeperConsumerConnector.shutdownCommand)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca2cf97a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 151ba7c..c0355cc 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -80,7 +80,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness
{
     val receivedMessages = (0 until 5).map(i => iter.next.message).toList
 
     assertFalse(iter.hasNext)
-    assertEquals(1, queue.size) // This is only the shutdown command.
+    assertEquals(0, queue.size) // Shutdown command has been consumed.
     assertEquals(5, receivedMessages.size)
     val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload))
     assertEquals(unconsumed, receivedMessages)


Mime
View raw message