kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1166424 - /incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
Date Wed, 07 Sep 2011 21:47:26 GMT
Author: junrao
Date: Wed Sep  7 21:47:26 2011
New Revision: 1166424

URL: http://svn.apache.org/viewvc?rev=1166424&view=rev
Log:
Hadoop Consumer goes into an infinite loop; patched by Sam William; reviewed by Richard Park;
KAFKA-131

Modified:
    incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java

Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1166424&r1=1166423&r2=1166424&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
(original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
Wed Sep  7 21:47:26 2011
@@ -49,7 +49,7 @@ public class KafkaETLContext {
     final static int DEFAULT_TIMEOUT = 60000; // one minute
 
     final static KafkaETLKey DUMMY_KEY = new KafkaETLKey();
-    
+
     protected int _index; /*index of context*/
     protected String _input = null; /*input string*/
     protected KafkaETLRequest _request = null;
@@ -61,7 +61,7 @@ public class KafkaETLContext {
 
     protected MultiFetchResponse _response = null;  /*fetch response*/
     protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
-    
+    protected Iterator<ByteBufferMessageSet> _respIterator = null;
     protected int _retry = 0;
     protected long _requestTime = 0; /*accumulative request time*/
     protected long _startTime = -1;
@@ -125,7 +125,7 @@ public class KafkaETLContext {
     
     public boolean hasMore () {
         return _messageIt != null && _messageIt.hasNext() 
-                || _response != null && _response.iterator().hasNext()
+                || _response != null && _respIterator.hasNext()
                 || _offset < _offsetRange[1]; 
     }
     
@@ -135,9 +135,9 @@ public class KafkaETLContext {
         boolean gotNext = get(key, value);
 
         if(_response != null) {
-            Iterator<ByteBufferMessageSet> iter = _response.iterator();
-            while ( !gotNext && iter.hasNext()) {
-                ByteBufferMessageSet msgSet = iter.next();
+
+            while ( !gotNext && _respIterator.hasNext()) {
+                ByteBufferMessageSet msgSet = _respIterator.next();
                 if ( hasError(msgSet)) return false;
                 _messageIt =  (Iterator<MessageAndOffset>) msgSet.iterator();
                 gotNext = get(key, value);
@@ -156,6 +156,8 @@ public class KafkaETLContext {
 
         long tempTime = System.currentTimeMillis();
         _response = _consumer.multifetch(array);
+        if(_response != null)
+            _respIterator = _response.iterator();
         _requestTime += (System.currentTimeMillis() - tempTime);
         
         return true;
@@ -198,7 +200,7 @@ public class KafkaETLContext {
             
             key.set(_index, _offset, msgAndOffset.message().checksum());
             
-            _offset += msgAndOffset.offset();  //increase offset
+            _offset = msgAndOffset.offset();  //increase offset
             _count ++;  //increase count
             
             return true;



Mime
View raw message