kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: fix StreamTask.close()
Date Wed, 25 Nov 2015 20:04:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 87b894d68 -> 617a91a23


HOTFIX: fix StreamTask.close()

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #586 from ymatsuda/fix_streamtask_close


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/617a91a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/617a91a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/617a91a2

Branch: refs/heads/trunk
Commit: 617a91a236a44431aeca9345ae954f7067da48f3
Parents: 87b894d
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Wed Nov 25 12:04:24 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 25 12:04:24 2015 -0800

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/AbstractTask.java       | 7 ++++++-
 .../apache/kafka/streams/processor/internals/StreamTask.java  | 5 +++++
 2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/617a91a2/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 64bb10d..14037ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 
 public abstract class AbstractTask {
@@ -84,10 +85,14 @@ public abstract class AbstractTask {
 
     public void close() {
         try {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(recordCollectorOffsets());
         } catch (IOException e) {
             throw new KafkaException("Error while closing the state manager in processor
context", e);
         }
     }
 
+    protected Map<TopicPartition, Long> recordCollectorOffsets() {
+        return Collections.emptyMap();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/617a91a2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 5d170f8..16f0667 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -304,6 +304,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
         super.close();
     }
 
+    @Override
+    protected Map<TopicPartition, Long> recordCollectorOffsets() {
+        return recordCollector.offsets();
+    }
+
     private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) {
         return new RecordQueue(partition, source);
     }


Mime
View raw message