kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4317: Regularly checkpoint StateStore changelog offsets
Date Fri, 17 Feb 2017 22:41:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fc1cfe475 -> 1f1e794ad


KAFKA-4317: Regularly checkpoint StateStore changelog offsets

Currently the checkpoint file is deleted at state store initialization and it is only ever written again during a clean shutdown. This can result in significant delays during restarts as the entire store needs to be loaded from the changelog.
We can mitigate against this by frequently checkpointing the offsets. The checkpointing happens only during the commit phase, i.e, after we have manually flushed the store and the producer. So we guarantee that the checkpointed offsets are never greater than what has been flushed.
In the event of hard failure we can recover by reading the checkpoints and consuming from the stored offsets.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2471 from dguy/kafka-4317


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1f1e794a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1f1e794a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1f1e794a

Branch: refs/heads/trunk
Commit: 1f1e794ad0025bb2a33b7d8378a481f224b3bccc
Parents: fc1cfe4
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Feb 17 14:41:28 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Feb 17 14:41:28 2017 -0800

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       |  18 +-
 .../processor/internals/Checkpointable.java     |  27 +++
 .../internals/GlobalStateManagerImpl.java       |  16 +-
 .../internals/GlobalStateUpdateTask.java        |   3 +-
 .../internals/ProcessorStateManager.java        |  57 +++---
 .../processor/internals/StandbyTask.java        |  23 +--
 .../processor/internals/StateManager.java       |   4 +-
 .../streams/processor/internals/StreamTask.java |   5 +-
 .../state/internals/InMemoryKeyValueStore.java  |   2 +-
 .../processor/internals/AbstractTaskTest.java   |   3 +-
 .../internals/GlobalStateManagerImplTest.java   |  49 ++++-
 .../internals/GlobalStateTaskTest.java          |  16 +-
 .../internals/ProcessorStateManagerTest.java    | 186 +++++++++++++------
 .../processor/internals/StandbyTaskTest.java    |  51 ++++-
 .../processor/internals/StateManagerStub.java   |   7 +-
 .../processor/internals/StreamTaskTest.java     |  60 ++++++
 .../kafka/test/GlobalStateManagerStub.java      |   7 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   3 +-
 18 files changed, 401 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 2a040ba..8de5d23 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -52,14 +52,14 @@ public abstract class AbstractTask {
     /**
      * @throws ProcessorStateException if the state manager cannot be created
      */
-    protected AbstractTask(TaskId id,
-                           String applicationId,
-                           Collection<TopicPartition> partitions,
-                           ProcessorTopology topology,
-                           Consumer<byte[], byte[]> consumer,
-                           Consumer<byte[], byte[]> restoreConsumer,
-                           boolean isStandby,
-                           StateDirectory stateDirectory,
+    protected AbstractTask(final TaskId id,
+                           final String applicationId,
+                           final Collection<TopicPartition> partitions,
+                           final ProcessorTopology topology,
+                           final Consumer<byte[], byte[]> consumer,
+                           final Consumer<byte[], byte[]> restoreConsumer,
+                           final boolean isStandby,
+                           final StateDirectory stateDirectory,
                            final ThreadCache cache) {
         this.id = id;
         this.applicationId = applicationId;
@@ -70,7 +70,7 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
+            stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
         } catch (IOException e) {
             throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java
new file mode 100644
index 0000000..7b02d5b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+// Interface to indicate that an object has associated partition offsets that can be checkpointed
+interface Checkpointable {
+    void checkpoint(final Map<TopicPartition, Long> offsets);
+    Map<TopicPartition, Long> checkpointed();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 7534993..3819bb5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -58,11 +58,11 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     private final File baseDir;
     private final OffsetCheckpoint checkpoint;
     private final Set<String> globalStoreNames = new HashSet<>();
-    private HashMap<TopicPartition, Long> checkpointableOffsets;
+    private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
 
     public GlobalStateManagerImpl(final ProcessorTopology topology,
-                           final Consumer<byte[], byte[]> consumer,
-                           final StateDirectory stateDirectory) {
+                                  final Consumer<byte[], byte[]> consumer,
+                                  final StateDirectory stateDirectory) {
         this.topology = topology;
         this.consumer = consumer;
         this.stateDirectory = stateDirectory;
@@ -81,8 +81,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
         }
 
         try {
-            this.checkpointableOffsets = new HashMap<>(checkpoint.read());
-            checkpoint.delete();
+            this.checkpointableOffsets.putAll(checkpoint.read());
         } catch (IOException e) {
             try {
                 stateDirectory.unlockGlobalState();
@@ -220,13 +219,14 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
             if (closeFailed.length() > 0) {
                 throw new ProcessorStateException("Exceptions caught during close of 1 or more global state stores\n" + closeFailed);
             }
-            writeCheckpoints(offsets);
+            checkpoint(offsets);
         } finally {
             stateDirectory.unlockGlobalState();
         }
     }
 
-    private void writeCheckpoints(final Map<TopicPartition, Long> offsets) {
+    @Override
+    public void checkpoint(final Map<TopicPartition, Long> offsets) {
         if (!offsets.isEmpty()) {
             checkpointableOffsets.putAll(offsets);
             try {
@@ -238,7 +238,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     }
 
     @Override
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         return Collections.unmodifiableMap(checkpointableOffsets);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 40f2a3c..6da37e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -67,7 +67,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
         }
         initTopology();
         processorContext.initialized();
-        return stateMgr.checkpointedOffsets();
+        return stateMgr.checkpointed();
     }
 
 
@@ -89,6 +89,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
 
     public void flushState() {
         stateMgr.flush(processorContext);
+        stateMgr.checkpoint(offsets);
     }
 
     public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1c786e3..0e8caa2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -68,6 +68,7 @@ public class ProcessorStateManager implements StateManager {
     // TODO: this map does not work with customized grouper where multiple partitions
     // of the same topic can be assigned to the same topic.
     private final Map<String, TopicPartition> partitionForTopic;
+    private final OffsetCheckpoint checkpoint;
 
     /**
      * @throws LockException if the state directory cannot be locked because another thread holds the lock
@@ -103,11 +104,8 @@ public class ProcessorStateManager implements StateManager {
         }
 
         // load the checkpoint information
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
+        checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
         this.checkpointedOffsets = new HashMap<>(checkpoint.read());
-
-        // delete the checkpoint file after finish loading its stored offsets
-        checkpoint.delete();
     }
 
 
@@ -250,7 +248,7 @@ public class ProcessorStateManager implements StateManager {
         }
     }
 
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
 
         for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
@@ -347,29 +345,7 @@ public class ProcessorStateManager implements StateManager {
                 }
 
                 if (ackedOffsets != null) {
-                    Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
-                    for (String storeName : stores.keySet()) {
-                        // only checkpoint the offset to the offsets file if
-                        // it is persistent AND changelog enabled
-                        if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
-                            String changelogTopic = storeToChangelogTopic.get(storeName);
-                            TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
-                            Long offset = ackedOffsets.get(topicPartition);
-
-                            if (offset != null) {
-                                // store the last offset + 1 (the log position after restoration)
-                                checkpointOffsets.put(topicPartition, offset + 1);
-                            } else {
-                                // if no record was produced. we need to check the restored offset.
-                                offset = restoredOffsets.get(topicPartition);
-                                if (offset != null)
-                                    checkpointOffsets.put(topicPartition, offset);
-                            }
-                        }
-                    }
-                    // write the checkpoint file before closing, to indicate clean shutdown
-                    OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
-                    checkpoint.write(checkpointOffsets);
+                    checkpoint(ackedOffsets);
                 }
 
             }
@@ -379,6 +355,31 @@ public class ProcessorStateManager implements StateManager {
         }
     }
 
+    // write the checkpoint
+    @Override
+    public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
+        for (String storeName : stores.keySet()) {
+            // only checkpoint the offset to the offsets file if
+            // it is persistent AND changelog enabled
+            if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
+                final String changelogTopic = storeToChangelogTopic.get(storeName);
+                final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
+                if (ackedOffsets.containsKey(topicPartition)) {
+                    // store the last offset + 1 (the log position after restoration)
+                    checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1);
+                } else if (restoredOffsets.containsKey(topicPartition)) {
+                    checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition));
+                }
+            }
+        }
+        // write the checkpoint file before closing, to indicate clean shutdown
+        try {
+            checkpoint.write(checkpointedOffsets);
+        } catch (IOException e) {
+            log.warn("Failed to write checkpoint file to {}", new File(baseDir, CHECKPOINT_FILE_NAME), e);
+        }
+    }
+
     private int getPartition(String topic) {
         TopicPartition partition = partitionForTopic.get(topic);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 4437a19..a27098c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -51,14 +51,15 @@ public class StandbyTask extends AbstractTask {
      * @param metrics               the {@link StreamsMetrics} created by the thread
      * @param stateDirectory        the {@link StateDirectory} created by the thread
      */
-    public StandbyTask(TaskId id,
-                       String applicationId,
-                       Collection<TopicPartition> partitions,
-                       ProcessorTopology topology,
-                       Consumer<byte[], byte[]> consumer,
-                       Consumer<byte[], byte[]> restoreConsumer,
-                       StreamsConfig config,
-                       StreamsMetrics metrics, final StateDirectory stateDirectory) {
+    public StandbyTask(final TaskId id,
+                       final String applicationId,
+                       final Collection<TopicPartition> partitions,
+                       final ProcessorTopology topology,
+                       final Consumer<byte[], byte[]> consumer,
+                       final Consumer<byte[], byte[]> restoreConsumer,
+                       final StreamsConfig config,
+                       final StreamsMetrics metrics,
+                       final StateDirectory stateDirectory) {
         super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null);
 
         // initialize the topology with its own context
@@ -67,9 +68,9 @@ public class StandbyTask extends AbstractTask {
         log.info("standby-task [{}] Initializing state stores", id());
         initializeStateStores();
 
-        ((StandbyContextImpl) this.processorContext).initialized();
+        this.processorContext.initialized();
 
-        this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
+        this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
     }
 
     public Map<TopicPartition, Long> checkpointedOffsets() {
@@ -92,7 +93,7 @@ public class StandbyTask extends AbstractTask {
     public void commit() {
         log.debug("standby-task [{}] Committing its state", id());
         stateMgr.flush(processorContext);
-
+        stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
         // reinitialize offset limits
         initializeOffsetLimits();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index 7343c85..3102b77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -24,7 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 
-interface StateManager {
+interface StateManager extends Checkpointable {
     File baseDir();
 
     void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback);
@@ -36,6 +36,4 @@ interface StateManager {
     StateStore getGlobalStore(final String name);
 
     StateStore getStore(final String name);
-
-    Map<TopicPartition, Long> checkpointedOffsets();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index be77856..d95ac4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -76,8 +76,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
             log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix);
             // 2) flush produced records in the downstream and change logs of local states
             recordCollector.flush();
-
-            // 3) commit consumed offsets if it is dirty already
+            // 3) write checkpoints for any local state
+            stateMgr.checkpoint(recordCollectorOffsets());
+            // 4) commit consumed offsets if it is dirty already
             commitOffsets();
         }
     };

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index fe50152..28dea79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -39,7 +39,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     private StateSerdes<K, V> serdes;
 
-    InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) {
+    public InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) {
         this.name = name;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index fc0953b..f288f98 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -60,6 +60,7 @@ public class AbstractTaskTest {
     }
 
     private AbstractTask createTask(final Consumer consumer) {
+        final MockTime time = new MockTime();
         return new AbstractTask(new TaskId(0, 0),
                                 "app",
                                 Collections.singletonList(new TopicPartition("t", 0)),
@@ -72,7 +73,7 @@ public class AbstractTaskTest {
                                 consumer,
                                 consumer,
                                 false,
-                                new StateDirectory("app", TestUtils.tempDirectory().getPath(), new MockTime()),
+                                new StateDirectory("app", TestUtils.tempDirectory().getPath(), time),
                                 new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) {
             @Override
             public void commit() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 168b300..062079f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -50,6 +50,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -125,15 +127,15 @@ public class GlobalStateManagerImplTest {
         final Map<TopicPartition, Long> expected = writeCheckpoint();
 
         stateManager.initialize(context);
-        final Map<TopicPartition, Long> offsets = stateManager.checkpointedOffsets();
+        final Map<TopicPartition, Long> offsets = stateManager.checkpointed();
         assertEquals(expected, offsets);
     }
 
     @Test
-    public void shouldDeleteCheckpointFileAfteLoaded() throws Exception {
+    public void shouldNotDeleteCheckpointFileAfterLoaded() throws Exception {
         writeCheckpoint();
         stateManager.initialize(context);
-        assertFalse(checkpointFile.exists());
+        assertTrue(checkpointFile.exists());
     }
 
     @Test(expected = StreamsException.class)
@@ -168,7 +170,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldThrowIllegalArgumenExceptionIfAttemptingToRegisterStoreTwice() throws Exception {
+    public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() throws Exception {
         stateManager.initialize(context);
         initializeConsumer(2, 1, t1);
         stateManager.register(store1, false, new TheStateRestoreCallback());
@@ -271,9 +273,7 @@ public class GlobalStateManagerImplTest {
         stateManager.register(store1, false, stateRestoreCallback);
         final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L);
         stateManager.close(expected);
-        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
-                                                                                ProcessorStateManager.CHECKPOINT_FILE_NAME));
-        final Map<TopicPartition, Long> result = offsetCheckpoint.read();
+        final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
         assertEquals(expected, result);
     }
 
@@ -377,6 +377,41 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
+    public void shouldCheckpointOffsets() throws Exception {
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L);
+        stateManager.initialize(context);
+
+        stateManager.checkpoint(offsets);
+
+        final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
+        assertThat(result, equalTo(offsets));
+        assertThat(stateManager.checkpointed(), equalTo(offsets));
+    }
+
+    @Test
+    public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() throws Exception {
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        initializeConsumer(10, 1, t1);
+        stateManager.register(store1, false, stateRestoreCallback);
+        initializeConsumer(20, 1, t2);
+        stateManager.register(store2, false, stateRestoreCallback);
+
+        final Map<TopicPartition, Long> initialCheckpoint = stateManager.checkpointed();
+        stateManager.checkpoint(Collections.singletonMap(t1, 101L));
+
+        final Map<TopicPartition, Long> updatedCheckpoint = stateManager.checkpointed();
+        assertThat(updatedCheckpoint.get(t2), equalTo(initialCheckpoint.get(t2)));
+        assertThat(updatedCheckpoint.get(t1), equalTo(101L));
+    }
+
+    private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
+        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
+                                                                                ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        return offsetCheckpoint.read();
+    }
+
+    @Test
     public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() throws Exception {
         stateManager = new GlobalStateManagerImpl(topology, consumer, new StateDirectory("appId", stateDirPath, time) {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index df0b73c..66999bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -38,6 +38,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -137,7 +139,19 @@ public class GlobalStateTaskTest {
         globalStateTask.initialize();
         globalStateTask.update(new ConsumerRecord<>("t1", 1, 51, "foo".getBytes(), "foo".getBytes()));
         globalStateTask.close();
-        assertEquals(expectedOffsets, stateMgr.checkpointedOffsets());
+        assertEquals(expectedOffsets, stateMgr.checkpointed());
         assertTrue(stateMgr.closed);
     }
+
+    @Test
+    public void shouldCheckpointOffsetsWhenStateIsFlushed() throws Exception {
+        final Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(t1, 102L);
+        expectedOffsets.put(t2, 100L);
+        globalStateTask.initialize();
+        globalStateTask.update(new ConsumerRecord<>("t1", 1, 101, "foo".getBytes(), "foo".getBytes()));
+        globalStateTask.flushState();
+        assertThat(stateMgr.checkpointed(), equalTo(expectedOffsets));
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index b8d51ba..f1d3090 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
@@ -58,6 +59,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -71,7 +74,6 @@ public class ProcessorStateManagerTest {
         private final Serializer<Integer> serializer = new IntegerSerializer();
 
         private TopicPartition assignedPartition = null;
-        private TopicPartition seekPartition = null;
         private long seekOffset = -1L;
         private boolean seekToBeginingCalled = false;
         private boolean seekToEndCalled = false;
@@ -162,7 +164,6 @@ public class ProcessorStateManagerTest {
             if (seekOffset >= 0)
                 throw new IllegalStateException("RestoreConsumer: offset already seeked");
 
-            seekPartition = partition;
             seekOffset = offset;
             currentOffset = offset;
             super.seek(partition, offset);
@@ -203,6 +204,9 @@ public class ProcessorStateManagerTest {
     private final String nonPersistentStoreName = "nonPersistentStore";
     private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
     private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName);
+    private final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
+    private final MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
+    private final TopicPartition persistentStorePartition = new TopicPartition(persistentStoreTopicName, 1);
     private final String storeName = "mockStateStore";
     private final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
     private final TopicPartition changelogTopicPartition = new TopicPartition(changelogTopic, 0);
@@ -210,6 +214,8 @@ public class ProcessorStateManagerTest {
     private final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
     private final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(storeName, true);
     private File baseDir;
+    private File checkpointFile;
+    private OffsetCheckpoint checkpoint;
     private StateDirectory stateDirectory;
 
 
@@ -217,6 +223,14 @@ public class ProcessorStateManagerTest {
     public void setup() {
         baseDir = TestUtils.tempDirectory();
         stateDirectory = new StateDirectory(applicationId, baseDir.getPath(), new MockTime());
+        checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        checkpoint = new OffsetCheckpoint(checkpointFile);
+        restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
+        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
     }
 
     @After
@@ -300,8 +314,6 @@ public class ProcessorStateManagerTest {
     public void testRegisterNonPersistentStore() throws IOException {
         long lastCheckpointedOffset = 10L;
 
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
         OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
         checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
 
@@ -313,8 +325,6 @@ public class ProcessorStateManagerTest {
         TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
         restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
 
-        MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
-
         ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
@@ -325,7 +335,7 @@ public class ProcessorStateManagerTest {
             restoreConsumer.reset();
 
             ArrayList<Integer> expectedKeys = new ArrayList<>();
-            long offset = -1L;
+            long offset;
             for (int i = 1; i <= 3; i++) {
                 offset = (long) (i + 100);
                 int key = i;
@@ -346,12 +356,13 @@ public class ProcessorStateManagerTest {
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
         }
-
     }
 
     @Test
     public void testChangeLogOffsets() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
+        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(
+                new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
         long lastCheckpointedOffset = 10L;
         String storeName1 = "store1";
         String storeName2 = "store2";
@@ -366,10 +377,7 @@ public class ProcessorStateManagerTest {
         storeToChangelogTopic.put(storeName2, storeTopicName2);
         storeToChangelogTopic.put(storeName3, storeTopicName3);
 
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
-        checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
-
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+        offsetCheckpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
 
         restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList(
                 new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0])
@@ -406,7 +414,7 @@ public class ProcessorStateManagerTest {
             stateMgr.register(store2, true, store2.stateRestoreCallback);
             stateMgr.register(store3, true, store3.stateRestoreCallback);
 
-            Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets();
+            Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed();
 
             assertEquals(3, changeLogOffsets.size());
             assertTrue(changeLogOffsets.containsKey(partition1));
@@ -424,20 +432,12 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testGetStore() throws IOException {
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
-        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
-                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-        ));
-
-        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
-
-        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
         try {
-            stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
+            stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
 
             assertNull(stateMgr.getStore("noSuchStore"));
-            assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
+            assertEquals(nonPersistentStore, stateMgr.getStore(nonPersistentStoreName));
 
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@@ -446,30 +446,15 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testFlushAndClose() throws IOException {
-        final TaskId taskId = new TaskId(0, 1);
-        File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
         // write an empty checkpoint file
-        OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
-        oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
-
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
-        restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
-                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-        ));
-        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
-                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-        ));
+        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
 
         // set up ack'ed offsets
-        HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
+        final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
         ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L);
         ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
         ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L);
 
-        MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-        MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
-
         ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
@@ -477,8 +462,8 @@ public class ProcessorStateManagerTest {
             }
         });
         try {
-            // make sure the checkpoint file is deleted
-            assertFalse(checkpointFile.exists());
+            // make sure the checkpoint file isn't deleted
+            assertTrue(checkpointFile.exists());
 
             restoreConsumer.reset();
             stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
@@ -499,39 +484,122 @@ public class ProcessorStateManagerTest {
         assertTrue(checkpointFile.exists());
 
         // the checkpoint file should contain an offset from the persistent store only.
-        OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
-        Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read();
+        final Map<TopicPartition, Long> checkpointedOffsets = checkpoint.read();
         assertEquals(1, checkpointedOffsets.size());
         assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1)));
     }
 
     @Test
     public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
-        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
         ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.<String, String>emptyMap());
-        stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback);
+        stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
 
     @Test
-    public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception {
-        final File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
-        // write an empty checkpoint file
-        final OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
-        oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
+    public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws Exception {
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(persistentStorePartition, 99L);
+        checkpoint.write(offsets);
 
-        restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
-                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-        ));
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
+                                                                         noPartitions,
+                                                                         restoreConsumer,
+                                                                         false,
+                                                                         stateDirectory,
+                                                                         Collections.<String, String>emptyMap());
+
+        restoreConsumer.reset();
+        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.close(null);
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(offsets));
+    }
+
+    @Test
+    public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception {
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
+                                                                         noPartitions,
+                                                                         restoreConsumer,
+                                                                         false,
+                                                                         stateDirectory,
+                                                                         Collections.singletonMap(persistentStore.name(),
+                                                                                                  persistentStoreTopicName));
+        restoreConsumer.reset();
+        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
 
 
-        final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
+        stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L));
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 11L)));
+    }
+
+    @Test
+    public void shouldWriteCheckpointForStandbyReplica() throws Exception {
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
+                                                                         noPartitions,
+                                                                         restoreConsumer,
+                                                                         true,
+                                                                         stateDirectory,
+                                                                         Collections.singletonMap(persistentStore.name(),
+                                                                                                  persistentStoreTopicName));
 
         restoreConsumer.reset();
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
-        stateMgr.close(null);
-        assertFalse(checkpointFile.exists());
+        final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
+        stateMgr.updateStandbyStates(persistentStorePartition,
+                                     Collections.singletonList(
+                                             new ConsumerRecord<>(persistentStorePartition.topic(),
+                                                                                persistentStorePartition.partition(),
+                                                                                888L,
+                                                                                bytes,
+                                                                                bytes)));
+
+        stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
+
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 889L)));
+
+    }
+
+    @Test
+    public void shouldNotWriteCheckpointForNonPersistent() throws Exception {
+        final TopicPartition topicPartition = new TopicPartition(nonPersistentStoreTopicName, 1);
+
+        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
+
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
+                                                                         noPartitions,
+                                                                         restoreConsumer,
+                                                                         true,
+                                                                         stateDirectory,
+                                                                         Collections.singletonMap(nonPersistentStoreName,
+                                                                                                  nonPersistentStoreTopicName));
+
+        restoreConsumer.reset();
+        stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
+        stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
+
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(Collections.<TopicPartition, Long>emptyMap()));
+    }
+
+    @Test
+    public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception {
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
+                                                                         noPartitions,
+                                                                         restoreConsumer,
+                                                                         true,
+                                                                         stateDirectory,
+                                                                         Collections.<String, String>emptyMap());
+
+        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+
+        stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 987L));
+
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(Collections.<TopicPartition, Long>emptyMap()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 629e521..ef4ebcc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
@@ -54,6 +55,8 @@ import java.util.Properties;
 import java.util.Set;
 
 import static java.util.Collections.singleton;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -316,7 +319,7 @@ public class StandbyTaskTest {
         final String changelogName = "test-application-my-store-changelog";
         final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
         consumer.assign(partitions);
-        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
         committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L));
         consumer.commitSync(committedOffsets);
 
@@ -327,9 +330,53 @@ public class StandbyTaskTest {
         final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0);
         StreamsConfig config = createConfig(baseDir);
         new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config,
-            new MockStreamsMetrics(new Metrics()), stateDirectory);
+                        new MockStreamsMetrics(new Metrics()), stateDirectory);
 
     }
+
+    @Test
+    public void shouldCheckpointStoreOffsetsOnCommit() throws Exception {
+        consumer.assign(Utils.mkList(ktable));
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(100L));
+        consumer.commitSync(committedOffsets);
+
+        restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(
+                new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0])));
+
+        final TaskId taskId = new TaskId(0, 0);
+        final MockTime time = new MockTime();
+        final StreamsConfig config = createConfig(baseDir);
+        final StandbyTask task = new StandbyTask(taskId,
+                                                 applicationId,
+                                                 ktablePartitions,
+                                                 ktableTopology,
+                                                 consumer,
+                                                 restoreStateConsumer,
+                                                 config,
+                                                 null,
+                                                 stateDirectory
+        );
+
+
+        restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
+
+        final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1);
+        task.update(ktable, Collections.singletonList(new ConsumerRecord<>(ktable.topic(),
+                                                                           ktable.partition(),
+                                                                           50L,
+                                                                           serializedValue,
+                                                                           serializedValue)));
+
+        time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
+        task.commit();
+
+        final Map<TopicPartition, Long> checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId),
+                                                                                   ProcessorStateManager.CHECKPOINT_FILE_NAME)).read();
+        assertThat(checkpoint, equalTo(Collections.singletonMap(ktable, 51L)));
+
+    }
+
     private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
index 3f48059..f4aca9f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
@@ -57,7 +57,12 @@ public class StateManagerStub implements StateManager {
     }
 
     @Override
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         return null;
     }
+
+    @Override
+    public void checkpoint(final Map<TopicPartition, Long> offsets) {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 15b1d25..5c72fc9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
@@ -39,6 +41,8 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
@@ -415,6 +419,62 @@ public class StreamTaskTest {
         assertTrue(flushed.get());
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCheckpointOffsetsOnCommit() throws Exception {
+        final String storeName = "test";
+        final String changelogTopic = ProcessorStateManager.storeChangelogTopic("appId", storeName);
+        final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) {
+            @Override
+            public void init(final ProcessorContext context, final StateStore root) {
+                context.register(root, true, null);
+            }
+
+            @Override
+            public boolean persistent() {
+                return true;
+            }
+        };
+        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
+                                                                 Collections.<String, SourceNode>emptyMap(),
+                                                                 Collections.<String, SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>singletonList(inMemoryStore),
+                                                                 Collections.singletonMap(storeName, changelogTopic),
+                                                                 Collections.<StateStore>emptyList());
+
+        final TopicPartition partition = new TopicPartition(changelogTopic, 0);
+        final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {
+            @Override
+            public Map<TopicPartition, Long> offsets() {
+
+                return Collections.singletonMap(partition, 543L);
+            }
+        };
+
+        restoreStateConsumer.updatePartitions(changelogTopic,
+                                              Collections.singletonList(
+                                                      new PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0])));
+        restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
+        restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
+
+        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
+        final TaskId taskId = new TaskId(0, 0);
+        final MockTime time = new MockTime();
+        final StreamsConfig config = createConfig(baseDir);
+        final StreamTask streamTask = new StreamTask(taskId, "appId", partitions, topology, consumer,
+                                                     restoreStateConsumer, config, streamsMetrics,
+                                                     stateDirectory, new ThreadCache("testCache", 0, streamsMetrics),
+                                                     time, recordCollector);
+
+        time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
+
+        streamTask.commit();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId),
+                                                                          ProcessorStateManager.CHECKPOINT_FILE_NAME));
+
+        assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, 544L)));
+    }
+
     @Test
     public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() throws Exception {
         ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processor);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index 2f3ef26..612a0da 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -67,6 +67,11 @@ public class GlobalStateManagerStub implements GlobalStateManager {
     }
 
     @Override
+    public void checkpoint(final Map<TopicPartition, Long> offsets) {
+        this.offsets.putAll(offsets);
+    }
+
+    @Override
     public StateStore getGlobalStore(final String name) {
         return null;
     }
@@ -77,7 +82,7 @@ public class GlobalStateManagerStub implements GlobalStateManager {
     }
 
     @Override
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         return offsets;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index b50ff34..ac8933d 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -211,7 +211,8 @@ public class ProcessorTopologyTestDriver {
             final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory);
             globalStateTask = new GlobalStateUpdateTask(globalTopology,
                                                         new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
-                                                        stateManager);
+                                                        stateManager
+            );
             globalStateTask.initialize();
         }
 


Mime
View raw message