kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: fix off-by-one stream offset commit
Date Tue, 27 Oct 2015 20:44:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c6c4f5070 -> 38a1b6055


HOTFIX: fix off-by-one stream offset commit

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #372 from ymatsuda/commit_offset


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/38a1b605
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/38a1b605
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/38a1b605

Branch: refs/heads/trunk
Commit: 38a1b605533632f61f4c23b69933eb496098311b
Parents: c6c4f50
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Tue Oct 27 13:49:19 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Oct 27 13:49:19 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/processor/internals/StreamTask.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/38a1b605/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d83d721..f01e00b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -271,7 +271,7 @@ public class StreamTask implements Punctuator {
         if (commitOffsetNeeded) {
             Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new
HashMap<>(consumedOffsets.size());
             for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet())
{
-                consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue()));
+                consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue()
+ 1L));
             }
             consumer.commitSync(consumedOffsetsAndMetadata);
             commitOffsetNeeded = false;


Mime
View raw message