kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4317: Checkpoint state stores on commit interval
Date Fri, 12 May 2017 06:18:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 bce189fa4 -> 9eb0cdb54


KAFKA-4317: Checkpoint state stores on commit interval

This is a backport of https://github.com/apache/kafka/pull/2471

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3024 from dguy/k4881-bp


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9eb0cdb5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9eb0cdb5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9eb0cdb5

Branch: refs/heads/0.10.2
Commit: 9eb0cdb546a6ec7792cce12a0ceb6cd30afc88ad
Parents: bce189f
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu May 11 23:18:23 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu May 11 23:18:23 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       |  19 +-
 .../processor/internals/Checkpointable.java     |  27 +++
 .../internals/GlobalStateManagerImpl.java       |  16 +-
 .../internals/GlobalStateUpdateTask.java        |   3 +-
 .../internals/ProcessorStateManager.java        |  58 +++---
 .../processor/internals/StandbyTask.java        |  23 +--
 .../processor/internals/StateManager.java       |   4 +-
 .../streams/processor/internals/StreamTask.java |   5 +-
 .../state/internals/InMemoryKeyValueStore.java  | 187 ++++++++++++++++++
 .../processor/internals/AbstractTaskTest.java   |   1 +
 .../internals/GlobalStateManagerImplTest.java   |  49 ++++-
 .../internals/GlobalStateTaskTest.java          |  16 +-
 .../internals/ProcessorStateManagerTest.java    | 198 +++++++++++++------
 .../processor/internals/StandbyTaskTest.java    |  49 ++++-
 .../processor/internals/StreamTaskTest.java     |  63 ++++++
 .../kafka/test/GlobalStateManagerStub.java      |   7 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   3 +-
 17 files changed, 588 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 55418d5..8de5d23 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -52,14 +52,14 @@ public abstract class AbstractTask {
     /**
      * @throws ProcessorStateException if the state manager cannot be created
      */
-    protected AbstractTask(TaskId id,
-                           String applicationId,
-                           Collection<TopicPartition> partitions,
-                           ProcessorTopology topology,
-                           Consumer<byte[], byte[]> consumer,
-                           Consumer<byte[], byte[]> restoreConsumer,
-                           boolean isStandby,
-                           StateDirectory stateDirectory,
+    protected AbstractTask(final TaskId id,
+                           final String applicationId,
+                           final Collection<TopicPartition> partitions,
+                           final ProcessorTopology topology,
+                           final Consumer<byte[], byte[]> consumer,
+                           final Consumer<byte[], byte[]> restoreConsumer,
+                           final boolean isStandby,
+                           final StateDirectory stateDirectory,
                            final ThreadCache cache) {
         this.id = id;
         this.applicationId = applicationId;
@@ -70,8 +70,7 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
-
+            stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
         } catch (IOException e) {
             throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java
new file mode 100644
index 0000000..7b02d5b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+// Interface to indicate that an object has associated partition offsets that can be checkpointed
+interface Checkpointable {
+    void checkpoint(final Map<TopicPartition, Long> offsets);
+    Map<TopicPartition, Long> checkpointed();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 7534993..3819bb5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -58,11 +58,11 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     private final File baseDir;
     private final OffsetCheckpoint checkpoint;
     private final Set<String> globalStoreNames = new HashSet<>();
-    private HashMap<TopicPartition, Long> checkpointableOffsets;
+    private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
 
     public GlobalStateManagerImpl(final ProcessorTopology topology,
-                           final Consumer<byte[], byte[]> consumer,
-                           final StateDirectory stateDirectory) {
+                                  final Consumer<byte[], byte[]> consumer,
+                                  final StateDirectory stateDirectory) {
         this.topology = topology;
         this.consumer = consumer;
         this.stateDirectory = stateDirectory;
@@ -81,8 +81,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
         }
 
         try {
-            this.checkpointableOffsets = new HashMap<>(checkpoint.read());
-            checkpoint.delete();
+            this.checkpointableOffsets.putAll(checkpoint.read());
         } catch (IOException e) {
             try {
                 stateDirectory.unlockGlobalState();
@@ -220,13 +219,14 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
             if (closeFailed.length() > 0) {
                 throw new ProcessorStateException("Exceptions caught during close of 1 or more global state stores\n" + closeFailed);
             }
-            writeCheckpoints(offsets);
+            checkpoint(offsets);
         } finally {
             stateDirectory.unlockGlobalState();
         }
     }
 
-    private void writeCheckpoints(final Map<TopicPartition, Long> offsets) {
+    @Override
+    public void checkpoint(final Map<TopicPartition, Long> offsets) {
         if (!offsets.isEmpty()) {
             checkpointableOffsets.putAll(offsets);
             try {
@@ -238,7 +238,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     }
 
     @Override
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         return Collections.unmodifiableMap(checkpointableOffsets);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 40f2a3c..6da37e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -67,7 +67,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
         }
         initTopology();
         processorContext.initialized();
-        return stateMgr.checkpointedOffsets();
+        return stateMgr.checkpointed();
     }
 
 
@@ -89,6 +89,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
 
     public void flushState() {
         stateMgr.flush(processorContext);
+        stateMgr.checkpoint(offsets);
     }
 
     public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2ef9634..ea074da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -68,6 +68,7 @@ public class ProcessorStateManager implements StateManager {
     // TODO: this map does not work with customized grouper where multiple partitions
     // of the same topic can be assigned to the same topic.
     private final Map<String, TopicPartition> partitionForTopic;
+    private final OffsetCheckpoint checkpoint;
 
     /**
      * @throws LockException if the state directory cannot be locked because another thread holds the lock
@@ -111,11 +112,8 @@ public class ProcessorStateManager implements StateManager {
         }
 
         // load the checkpoint information
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
+        checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
         this.checkpointedOffsets = new HashMap<>(checkpoint.read());
-
-        // delete the checkpoint file after finish loading its stored offsets
-        checkpoint.delete();
     }
 
 
@@ -263,7 +261,7 @@ public class ProcessorStateManager implements StateManager {
         }
     }
 
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
 
         for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
@@ -360,30 +358,7 @@ public class ProcessorStateManager implements StateManager {
                 }
 
                 if (ackedOffsets != null) {
-                    Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
-                    for (String storeName : stores.keySet()) {
-                        // only checkpoint the offset to the offsets file if
-                        // it is persistent AND changelog enabled
-                        if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
-                            String changelogTopic = storeToChangelogTopic.get(storeName);
-                            TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
-
-                            Long offset = ackedOffsets.get(topicPartition);
-
-                            if (offset != null) {
-                                // store the last offset + 1 (the log position after restoration)
-                                checkpointOffsets.put(topicPartition, offset + 1);
-                            } else {
-                                // if no record was produced. we need to check the restored offset.
-                                offset = restoredOffsets.get(topicPartition);
-                                if (offset != null)
-                                    checkpointOffsets.put(topicPartition, offset);
-                            }
-                        }
-                    }
-                    // write the checkpoint file before closing, to indicate clean shutdown
-                    OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
-                    checkpoint.write(checkpointOffsets);
+                    checkpoint(ackedOffsets);
                 }
 
             }
@@ -393,6 +368,31 @@ public class ProcessorStateManager implements StateManager {
         }
     }
 
+    // write the checkpoint
+    @Override
+    public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
+        for (String storeName : stores.keySet()) {
+            // only checkpoint the offset to the offsets file if
+            // it is persistent AND changelog enabled
+            if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
+                final String changelogTopic = storeToChangelogTopic.get(storeName);
+                final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
+                if (ackedOffsets.containsKey(topicPartition)) {
+                    // store the last offset + 1 (the log position after restoration)
+                    checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1);
+                } else if (restoredOffsets.containsKey(topicPartition)) {
+                    checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition));
+                }
+            }
+        }
+        // write the checkpoint file before closing, to indicate clean shutdown
+        try {
+            checkpoint.write(checkpointedOffsets);
+        } catch (IOException e) {
+            log.warn("Failed to write checkpoint file to {}", new File(baseDir, CHECKPOINT_FILE_NAME), e);
+        }
+    }
+
     private int getPartition(String topic) {
         TopicPartition partition = partitionForTopic.get(topic);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 4437a19..a27098c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -51,14 +51,15 @@ public class StandbyTask extends AbstractTask {
      * @param metrics               the {@link StreamsMetrics} created by the thread
      * @param stateDirectory        the {@link StateDirectory} created by the thread
      */
-    public StandbyTask(TaskId id,
-                       String applicationId,
-                       Collection<TopicPartition> partitions,
-                       ProcessorTopology topology,
-                       Consumer<byte[], byte[]> consumer,
-                       Consumer<byte[], byte[]> restoreConsumer,
-                       StreamsConfig config,
-                       StreamsMetrics metrics, final StateDirectory stateDirectory) {
+    public StandbyTask(final TaskId id,
+                       final String applicationId,
+                       final Collection<TopicPartition> partitions,
+                       final ProcessorTopology topology,
+                       final Consumer<byte[], byte[]> consumer,
+                       final Consumer<byte[], byte[]> restoreConsumer,
+                       final StreamsConfig config,
+                       final StreamsMetrics metrics,
+                       final StateDirectory stateDirectory) {
         super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null);
 
         // initialize the topology with its own context
@@ -67,9 +68,9 @@ public class StandbyTask extends AbstractTask {
         log.info("standby-task [{}] Initializing state stores", id());
         initializeStateStores();
 
-        ((StandbyContextImpl) this.processorContext).initialized();
+        this.processorContext.initialized();
 
-        this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
+        this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
     }
 
     public Map<TopicPartition, Long> checkpointedOffsets() {
@@ -92,7 +93,7 @@ public class StandbyTask extends AbstractTask {
     public void commit() {
         log.debug("standby-task [{}] Committing its state", id());
         stateMgr.flush(processorContext);
-
+        stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
         // reinitialize offset limits
         initializeOffsetLimits();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index 7343c85..3102b77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -24,7 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 
-interface StateManager {
+interface StateManager extends Checkpointable {
     File baseDir();
 
     void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback);
@@ -36,6 +36,4 @@ interface StateManager {
     StateStore getGlobalStore(final String name);
 
     StateStore getStore(final String name);
-
-    Map<TopicPartition, Long> checkpointedOffsets();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7375fb5..3270596 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -74,8 +74,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
             log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix);
             // 2) flush produced records in the downstream and change logs of local states
             recordCollector.flush();
-
-            // 3) commit consumed offsets if it is dirty already
+            // 3) write checkpoints for any local state
+            stateMgr.checkpoint(recordCollectorOffsets());
+            // 4) commit consumed offsets if it is dirty already
             commitOffsets();
         }
     };

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
new file mode 100644
index 0000000..dbcc219
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+
+public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
+    private final String name;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private final NavigableMap<K, V> map;
+    private volatile boolean open = false;
+
+    private StateSerdes<K, V> serdes;
+
+    public InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) {
+        this.name = name;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+
+        // TODO: when we have serde associated with class types, we can
+        // improve this situation by passing the comparator here.
+        this.map = new TreeMap<>();
+    }
+
+    public KeyValueStore<K, V> enableLogging() {
+        return new InMemoryKeyValueLoggedStore<>(name, this, keySerde, valueSerde);
+    }
+
+    @Override
+    public String name() {
+        return this.name;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void init(ProcessorContext context, StateStore root) {
+        // construct the serde
+        this.serdes = new StateSerdes<>(name,
+                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
+        if (root != null) {
+            // register the store
+            context.register(root, true, new StateRestoreCallback() {
+                @Override
+                public void restore(byte[] key, byte[] value) {
+                    // check value for null, to avoid  deserialization error.
+                    if (value == null) {
+                        put(serdes.keyFrom(key), null);
+                    } else {
+                        put(serdes.keyFrom(key), serdes.valueFrom(value));
+                    }
+                }
+            });
+        }
+
+        this.open = true;
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.open;
+    }
+
+    @Override
+    public synchronized V get(K key) {
+        return this.map.get(key);
+    }
+
+    @Override
+    public synchronized void put(K key, V value) {
+        this.map.put(key, value);
+    }
+
+    @Override
+    public synchronized V putIfAbsent(K key, V value) {
+        V originalValue = get(key);
+        if (originalValue == null) {
+            put(key, value);
+        }
+        return originalValue;
+    }
+
+    @Override
+    public synchronized void putAll(List<KeyValue<K, V>> entries) {
+        for (KeyValue<K, V> entry : entries)
+            put(entry.key, entry.value);
+    }
+
+    @Override
+    public synchronized V delete(K key) {
+        return this.map.remove(key);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<K, V> range(K from, K to) {
+        return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<K, V> all() {
+        final TreeMap<K, V> copy = new TreeMap<>(this.map);
+        return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(copy.entrySet().iterator()));
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return this.map.size();
+    }
+
+    @Override
+    public void flush() {
+        // do-nothing since it is in-memory
+    }
+
+    @Override
+    public void close() {
+        this.map.clear();
+        this.open = false;
+    }
+
+    private static class InMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
+        private final Iterator<Map.Entry<K, V>> iter;
+
+        private InMemoryKeyValueIterator(Iterator<Map.Entry<K, V>> iter) {
+            this.iter = iter;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            Map.Entry<K, V> entry = iter.next();
+            return new KeyValue<>(entry.getKey(), entry.getValue());
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+
+        @Override
+        public K peekNextKey() {
+            throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 16967bc..a2346fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -71,6 +71,7 @@ public class AbstractTaskTest {
                                 consumer,
                                 consumer,
                                 false,
+
                                 new StateDirectory("app", TestUtils.tempDirectory().getPath()),
                                 new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index db51cef..8c9cf19 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -49,6 +49,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -122,15 +124,15 @@ public class GlobalStateManagerImplTest {
         final Map<TopicPartition, Long> expected = writeCheckpoint();
 
         stateManager.initialize(context);
-        final Map<TopicPartition, Long> offsets = stateManager.checkpointedOffsets();
+        final Map<TopicPartition, Long> offsets = stateManager.checkpointed();
         assertEquals(expected, offsets);
     }
 
     @Test
-    public void shouldDeleteCheckpointFileAfteLoaded() throws Exception {
+    public void shouldNotDeleteCheckpointFileAfterLoaded() throws Exception {
         writeCheckpoint();
         stateManager.initialize(context);
-        assertFalse(checkpointFile.exists());
+        assertTrue(checkpointFile.exists());
     }
 
     @Test(expected = StreamsException.class)
@@ -168,7 +170,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldThrowIllegalArgumenExceptionIfAttemptingToRegisterStoreTwice() throws Exception {
+    public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() throws Exception {
         stateManager.initialize(context);
         initializeConsumer(2, 1, t1);
         stateManager.register(store1, false, new TheStateRestoreCallback());
@@ -271,9 +273,7 @@ public class GlobalStateManagerImplTest {
         stateManager.register(store1, false, stateRestoreCallback);
         final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L);
         stateManager.close(expected);
-        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
-                                                                                ProcessorStateManager.CHECKPOINT_FILE_NAME));
-        final Map<TopicPartition, Long> result = offsetCheckpoint.read();
+        final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
         assertEquals(expected, result);
     }
 
@@ -379,6 +379,41 @@ public class GlobalStateManagerImplTest {
         }
     }
 
+    @Test
+    public void shouldCheckpointOffsets() throws Exception {
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L);
+        stateManager.initialize(context);
+
+        stateManager.checkpoint(offsets);
+
+        final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
+        assertThat(result, equalTo(offsets));
+        assertThat(stateManager.checkpointed(), equalTo(offsets));
+    }
+
+    @Test
+    public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() throws Exception {
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        initializeConsumer(10, 1, t1);
+        stateManager.register(store1, false, stateRestoreCallback);
+        initializeConsumer(20, 1, t2);
+        stateManager.register(store2, false, stateRestoreCallback);
+
+        final Map<TopicPartition, Long> initialCheckpoint = stateManager.checkpointed();
+        stateManager.checkpoint(Collections.singletonMap(t1, 101L));
+
+        final Map<TopicPartition, Long> updatedCheckpoint = stateManager.checkpointed();
+        assertThat(updatedCheckpoint.get(t2), equalTo(initialCheckpoint.get(t2)));
+        assertThat(updatedCheckpoint.get(t1), equalTo(101L));
+    }
+
+    private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
+        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
+                                                                                ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        return offsetCheckpoint.read();
+    }
+
     private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition topicPartition) {
         final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
         startOffsets.put(topicPartition, 1L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index df0b73c..66999bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -38,6 +38,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -137,7 +139,19 @@ public class GlobalStateTaskTest {
         globalStateTask.initialize();
         globalStateTask.update(new ConsumerRecord<>("t1", 1, 51, "foo".getBytes(), "foo".getBytes()));
         globalStateTask.close();
-        assertEquals(expectedOffsets, stateMgr.checkpointedOffsets());
+        assertEquals(expectedOffsets, stateMgr.checkpointed());
         assertTrue(stateMgr.closed);
     }
+
+    @Test
+    public void shouldCheckpointOffsetsWhenStateIsFlushed() throws Exception {
+        final Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(t1, 102L);
+        expectedOffsets.put(t2, 100L);
+        globalStateTask.initialize();
+        globalStateTask.update(new ConsumerRecord<>("t1", 1, 101, "foo".getBytes(), "foo".getBytes()));
+        globalStateTask.flushState();
+        assertThat(stateMgr.checkpointed(), equalTo(expectedOffsets));
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 602601a..a9998e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -49,6 +50,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -57,14 +60,10 @@ import static org.junit.Assert.assertFalse;
 
 public class ProcessorStateManagerTest {
 
-    private File baseDir;
-    private StateDirectory stateDirectory;
-
     public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
         private final Serializer<Integer> serializer = new IntegerSerializer();
 
         private TopicPartition assignedPartition = null;
-        private TopicPartition seekPartition = null;
         private long seekOffset = -1L;
         private boolean seekToBeginingCalled = false;
         private boolean seekToEndCalled = false;
@@ -155,7 +154,6 @@ public class ProcessorStateManagerTest {
             if (seekOffset >= 0)
                 throw new IllegalStateException("RestoreConsumer: offset already seeked");
 
-            seekPartition = partition;
             seekOffset = offset;
             currentOffset = offset;
             super.seek(partition, offset);
@@ -196,11 +194,32 @@ public class ProcessorStateManagerTest {
     private final String nonPersistentStoreName = "nonPersistentStore";
     private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
     private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName);
+    private final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
+    private final MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
+    private final TopicPartition persistentStorePartition = new TopicPartition(persistentStoreTopicName, 1);
+    private final String storeName = "mockStateStore";
+    private final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
+    private final TopicPartition changelogTopicPartition = new TopicPartition(changelogTopic, 0);
+    private final TaskId taskId = new TaskId(0, 1);
+    private final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+    private final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(storeName, true);
+    private File baseDir;
+    private File checkpointFile;
+    private StateDirectory stateDirectory;
+    private OffsetCheckpoint checkpoint;
 
     @Before
     public void setup() {
         baseDir = TestUtils.tempDirectory();
         stateDirectory = new StateDirectory(applicationId, baseDir.getPath());
+        checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        checkpoint = new OffsetCheckpoint(checkpointFile);
+        restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
+        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
     }
 
     @After
@@ -283,8 +302,6 @@ public class ProcessorStateManagerTest {
     public void testRegisterNonPersistentStore() throws IOException {
         long lastCheckpointedOffset = 10L;
 
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
         OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
         checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
 
@@ -296,8 +313,6 @@ public class ProcessorStateManagerTest {
         TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
         restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
 
-        MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
-
         ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
@@ -308,7 +323,7 @@ public class ProcessorStateManagerTest {
             restoreConsumer.reset();
 
             ArrayList<Integer> expectedKeys = new ArrayList<>();
-            long offset = -1L;
+            long offset;
             for (int i = 1; i <= 3; i++) {
                 offset = (long) (i + 100);
                 int key = i;
@@ -329,12 +344,13 @@ public class ProcessorStateManagerTest {
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
         }
-
     }
 
     @Test
     public void testChangeLogOffsets() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
+        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(
+                new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
         long lastCheckpointedOffset = 10L;
         String storeName1 = "store1";
         String storeName2 = "store2";
@@ -349,10 +365,7 @@ public class ProcessorStateManagerTest {
         storeToChangelogTopic.put(storeName2, storeTopicName2);
         storeToChangelogTopic.put(storeName3, storeTopicName3);
 
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
-        checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
-
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+        offsetCheckpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
 
         restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList(
                 new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0])
@@ -389,7 +402,7 @@ public class ProcessorStateManagerTest {
             stateMgr.register(store2, true, store2.stateRestoreCallback);
             stateMgr.register(store3, true, store3.stateRestoreCallback);
 
-            Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets();
+            Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed();
 
             assertEquals(3, changeLogOffsets.size());
             assertTrue(changeLogOffsets.containsKey(partition1));
@@ -407,20 +420,12 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testGetStore() throws IOException {
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
-        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
-                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-        ));
-
-        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
-
-        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
         try {
-            stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
+            stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
 
             assertNull(stateMgr.getStore("noSuchStore"));
-            assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
+            assertEquals(nonPersistentStore, stateMgr.getStore(nonPersistentStoreName));
 
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@@ -429,30 +434,15 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testFlushAndClose() throws IOException {
-        final TaskId taskId = new TaskId(0, 1);
-        File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
         // write an empty checkpoint file
-        OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
-        oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
-
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
-        restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
-                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-        ));
-        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
-                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-        ));
+        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
 
         // set up ack'ed offsets
-        HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
+        final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
         ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L);
         ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
         ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L);
 
-        MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-        MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
-
         ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
@@ -460,8 +450,8 @@ public class ProcessorStateManagerTest {
             }
         });
         try {
-            // make sure the checkpoint file is deleted
-            assertFalse(checkpointFile.exists());
+            // make sure the checkpoint file isn't deleted
+            assertTrue(checkpointFile.exists());
 
             restoreConsumer.reset();
             stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
@@ -482,42 +472,122 @@ public class ProcessorStateManagerTest {
         assertTrue(checkpointFile.exists());
 
         // the checkpoint file should contain an offset from the persistent store only.
-        OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
-        Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read();
+        final Map<TopicPartition, Long> checkpointedOffsets = checkpoint.read();
         assertEquals(1, checkpointedOffsets.size());
         assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1)));
     }
 
     @Test
     public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
-        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
         ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.<String, String>emptyMap());
-        stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback);
+        stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
 
+
     @Test
-    public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception {
-        final TaskId taskId = new TaskId(0, 1);
-        final File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
-        // write an empty checkpoint file
-        final OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
-        oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
+    public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws Exception {
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(persistentStorePartition, 99L);
+        checkpoint.write(offsets);
 
-        final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
+                                                                         noPartitions,
+                                                                         restoreConsumer,
+                                                                         false,
+                                                                         stateDirectory,
+                                                                         Collections.<String, String>emptyMap());
 
-        restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
-                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-        ));
+        restoreConsumer.reset();
+        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.close(null);
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(offsets));
+    }
 
+    @Test
+    public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception {
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
+                                                                         noPartitions,
+                                                                         restoreConsumer,
+                                                                         false,
+                                                                         stateDirectory,
+                                                                         Collections.singletonMap(persistentStore.name(),
+                                                                                                  persistentStoreTopicName));
+        restoreConsumer.reset();
+        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
 
-        final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
+        stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L));
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 11L)));
+    }
+
+    @Test
+    public void shouldWriteCheckpointForStandbyReplica() throws Exception {
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
+                                                                         noPartitions,
+                                                                         restoreConsumer,
+                                                                         true,
+                                                                         stateDirectory,
+                                                                         Collections.singletonMap(persistentStore.name(),
+                                                                                                  persistentStoreTopicName));
 
         restoreConsumer.reset();
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
-        stateMgr.close(null);
-        assertFalse(checkpointFile.exists());
+        final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
+        stateMgr.updateStandbyStates(persistentStorePartition,
+                                     Collections.singletonList(
+                                             new ConsumerRecord<>(persistentStorePartition.topic(),
+                                                                                persistentStorePartition.partition(),
+                                                                                888L,
+                                                                                bytes,
+                                                                                bytes)));
+
+        stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
+
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 889L)));
+
+    }
+
+    @Test
+    public void shouldNotWriteCheckpointForNonPersistent() throws Exception {
+        final TopicPartition topicPartition = new TopicPartition(nonPersistentStoreTopicName, 1);
+
+        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
+
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
+                                                                         noPartitions,
+                                                                         restoreConsumer,
+                                                                         true,
+                                                                         stateDirectory,
+                                                                         Collections.singletonMap(nonPersistentStoreName,
+                                                                                                  nonPersistentStoreTopicName));
+
+        restoreConsumer.reset();
+        stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
+        stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
+
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(Collections.<TopicPartition, Long>emptyMap()));
+    }
+
+    @Test
+    public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception {
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
+                                                                         noPartitions,
+                                                                         restoreConsumer,
+                                                                         true,
+                                                                         stateDirectory,
+                                                                         Collections.<String, String>emptyMap());
+
+        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+
+        stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 987L));
+
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(Collections.<TopicPartition, Long>emptyMap()));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 2d32e78..4c3356a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -53,6 +54,8 @@ import java.util.Properties;
 import java.util.Set;
 
 import static java.util.Collections.singleton;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -315,7 +318,7 @@ public class StandbyTaskTest {
         final String changelogName = "test-application-my-store-changelog";
         final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
         consumer.assign(partitions);
-        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
         committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L));
         consumer.commitSync(committedOffsets);
 
@@ -326,9 +329,51 @@ public class StandbyTaskTest {
         final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0);
         StreamsConfig config = createConfig(baseDir);
         new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config,
-            new MockStreamsMetrics(new Metrics()), stateDirectory);
+                        new MockStreamsMetrics(new Metrics()), stateDirectory);
 
     }
+
+    @Test
+    public void shouldCheckpointStoreOffsetsOnCommit() throws Exception {
+        consumer.assign(Utils.mkList(ktable));
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(100L));
+        consumer.commitSync(committedOffsets);
+
+        restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(
+                new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0])));
+
+        final TaskId taskId = new TaskId(0, 0);
+        final StreamsConfig config = createConfig(baseDir);
+        final StandbyTask task = new StandbyTask(taskId,
+                                                 applicationId,
+                                                 ktablePartitions,
+                                                 ktableTopology,
+                                                 consumer,
+                                                 restoreStateConsumer,
+                                                 config,
+                                                 null,
+                                                 stateDirectory
+        );
+
+
+        restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
+
+        final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1);
+        task.update(ktable, Collections.singletonList(new ConsumerRecord<>(ktable.topic(),
+                                                                           ktable.partition(),
+                                                                           50L,
+                                                                           serializedValue,
+                                                                           serializedValue)));
+
+        task.commit();
+
+        final Map<TopicPartition, Long> checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId),
+                                                                                   ProcessorStateManager.CHECKPOINT_FILE_NAME)).read();
+        assertThat(checkpoint, equalTo(Collections.singletonMap(ktable, 51L)));
+
+    }
+
     private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0479b9d..7f27fc4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
@@ -39,6 +41,8 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
@@ -59,6 +63,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -437,6 +443,63 @@ public class StreamTaskTest {
         assertTrue(flushed.get());
     }
 
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCheckpointOffsetsOnCommit() throws Exception {
+        final String storeName = "test";
+        final String changelogTopic = ProcessorStateManager.storeChangelogTopic("appId", storeName);
+        final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) {
+            @Override
+            public void init(final ProcessorContext context, final StateStore root) {
+                context.register(root, true, null);
+            }
+
+            @Override
+            public boolean persistent() {
+                return true;
+            }
+        };
+        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
+                                                                 Collections.<String, SourceNode>emptyMap(),
+                                                                 Collections.<String, SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>singletonList(inMemoryStore),
+                                                                 Collections.singletonMap(storeName, changelogTopic),
+                                                                 Collections.<StateStore>emptyList());
+
+        final TopicPartition partition = new TopicPartition(changelogTopic, 0);
+        final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {
+            @Override
+            public Map<TopicPartition, Long> offsets() {
+
+                return Collections.singletonMap(partition, 543L);
+            }
+        };
+
+        restoreStateConsumer.updatePartitions(changelogTopic,
+                                              Collections.singletonList(
+                                                      new PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0])));
+        restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
+        restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
+
+        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
+        final TaskId taskId = new TaskId(0, 0);
+        final MockTime time = new MockTime();
+        final StreamsConfig config = createConfig(baseDir);
+        final StreamTask streamTask = new StreamTask(taskId, "appId", partitions, topology, consumer,
+                                                     restoreStateConsumer, config, streamsMetrics,
+                                                     stateDirectory, new ThreadCache("testCache", 0, streamsMetrics),
+                                                     time, recordCollector);
+
+        time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
+
+        streamTask.commit();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId),
+                                                                          ProcessorStateManager.CHECKPOINT_FILE_NAME));
+
+        assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, 544L)));
+    }
+
     private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index 2f3ef26..612a0da 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -67,6 +67,11 @@ public class GlobalStateManagerStub implements GlobalStateManager {
     }
 
     @Override
+    public void checkpoint(final Map<TopicPartition, Long> offsets) {
+        this.offsets.putAll(offsets);
+    }
+
+    @Override
     public StateStore getGlobalStore(final String name) {
         return null;
     }
@@ -77,7 +82,7 @@ public class GlobalStateManagerStub implements GlobalStateManager {
     }
 
     @Override
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         return offsets;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 89ca0df..7ace43a 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -204,7 +204,8 @@ public class ProcessorTopologyTestDriver {
             final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory);
             globalStateTask = new GlobalStateUpdateTask(globalTopology,
                                                         new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
-                                                        stateManager);
+                                                        stateManager
+            );
             globalStateTask.initialize();
         }
 


Mime
View raw message