kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5241: GlobalKTable should checkpoint offsets after restoring state
Date Tue, 16 May 2017 21:01:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 5719e8c9c -> e06cd3e55


KAFKA-5241: GlobalKTable should checkpoint offsets after restoring state

Ensure checkpointable offsets for GlobalKTables are always written on close.

Author: Tommy Becker <tobecker@tivo.com>

Reviewers: Damian Guy, Eno Thereska, Guozhang Wang

Closes #3054 from twbecker/KAFKA-5241

(cherry picked from commit 73703a15c5006ddca166a458dcde72e17c91de4a)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e06cd3e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e06cd3e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e06cd3e5

Branch: refs/heads/0.10.2
Commit: e06cd3e55f25a0bb414e0770493906ea8019420a
Parents: 5719e8c
Author: Tommy Becker <tobecker@tivo.com>
Authored: Tue May 16 13:59:19 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue May 16 14:01:08 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/GlobalStateManagerImpl.java  |  4 ++--
 .../internals/GlobalStateManagerImplTest.java        | 15 ++++++++++++++-
 2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e06cd3e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 3819bb5..dd40ad7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -227,8 +227,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
 
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
-        if (!offsets.isEmpty()) {
-            checkpointableOffsets.putAll(offsets);
+        checkpointableOffsets.putAll(offsets);
+        if (!checkpointableOffsets.isEmpty()) {
             try {
                 checkpoint.write(checkpointableOffsets);
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e06cd3e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 8c9cf19..6f06ad9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -408,6 +408,19 @@ public class GlobalStateManagerImplTest {
         assertThat(updatedCheckpoint.get(t1), equalTo(101L));
     }
 
+    @Test
+    public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        initializeConsumer(10, 1, t1);
+        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+        final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed();
+        assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 11L)));
+        assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
+    }
+
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
                                                                                 ProcessorStateManager.CHECKPOINT_FILE_NAME));
@@ -444,4 +457,4 @@ public class GlobalStateManagerImplTest {
             restored.add(KeyValue.pair(key, value));
         }
     }
-}
\ No newline at end of file
+}


Mime
View raw message