kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1233501 - in /incubator/kafka/trunk/core/src: main/scala/kafka/consumer/ConsumerIterator.scala main/scala/kafka/utils/IteratorTemplate.scala test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
Date Thu, 19 Jan 2012 17:53:32 GMT
Author: junrao
Date: Thu Jan 19 17:53:32 2012
New Revision: 1233501

URL: http://svn.apache.org/viewvc?rev=1233501&view=rev
Log:
ConsumerIterator throws a IllegalStateException after a ConsumerTimeout occurs; patched by
Jun Rao; reviewed by Joel Koshy; KAFKA-241

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1233501&r1=1233500&r2=1233501&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Thu Jan
19 17:53:32 2012
@@ -58,6 +58,8 @@ class ConsumerIterator[T](private val to
       else {
         currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
         if (currentDataChunk == null) {
+          // reset state to make the iterator re-iterable
+          resetState()
           throw new ConsumerTimeoutException
         }
       }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala?rev=1233501&r1=1233500&r2=1233501&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala Thu Jan 19
17:53:32 2012
@@ -72,6 +72,9 @@ abstract class IteratorTemplate[T] exten
   
   def remove = 
     throw new UnsupportedOperationException("Removal not supported")
-  
+
+  protected def resetState() {
+    state = NOT_READY
+  }
 }
 

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1233501&r1=1233500&r2=1233501&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
Thu Jan 19 17:53:32 2012
@@ -63,15 +63,20 @@ class ZookeeperConsumerConnectorTest ext
     }
     val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
     val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic
-> numNodes*numParts/2))
-    try {
-      getMessages(nMessages*2, topicMessageStreams0)
-      fail("should get an exception")
-    }
-    catch {
-      case e: ConsumerTimeoutException => // this is ok
-        println("This is ok")
-      case e => throw e
+
+    // no messages to consume, we should hit timeout;
+    // also the iterator should support re-entrant, so loop it twice
+    for (i <- 0 until  2) {
+      try {
+        getMessages(nMessages*2, topicMessageStreams0)
+        fail("should get an exception")
+      }
+      catch {
+        case e: ConsumerTimeoutException => // this is ok
+        case e => throw e
+      }
     }
+
     zkConsumerConnector0.shutdown
 
     // send some messages to each broker



Mime
View raw message