kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-5363 (KIP-167): implementing bulk load, restoration event notification
Date Fri, 28 Jul 2017 18:31:00 GMT
KAFKA-5363 (KIP-167): implementing bulk load, restoration event notification

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3325 from bbejeck/KAFKA-5363_add_ability_to_batch_restore


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c50c941a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c50c941a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c50c941a

Branch: refs/heads/trunk
Commit: c50c941af1395d010160790f0127f06106cc69c8
Parents: 4003c93
Author: Bill Bejeck <bill@confluent.io>
Authored: Fri Jul 28 11:30:56 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jul 28 11:30:56 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  20 ++
 ...bstractNotifyingBatchingRestoreCallback.java |  83 +++++++
 .../AbstractNotifyingRestoreCallback.java       |  72 ++++++
 .../processor/BatchingStateRestoreCallback.java |  41 ++++
 .../streams/processor/StateRestoreListener.java |  84 +++++++
 .../internals/CompositeRestoreListener.java     | 109 +++++++++
 .../internals/GlobalStateManagerImpl.java       |  12 +-
 .../internals/ProcessorStateManager.java        |   5 +-
 .../processor/internals/StateRestorer.java      |  38 ++-
 .../internals/StoreChangelogReader.java         |  41 +++-
 .../processor/internals/StreamThread.java       |  17 +-
 .../WrappedBatchingStateRestoreCallback.java    |  46 ++++
 .../streams/state/internals/RocksDBStore.java   |  56 ++++-
 .../apache/kafka/streams/KafkaStreamsTest.java  |  14 ++
 .../processor/internals/AbstractTaskTest.java   |   3 +-
 .../internals/CompositeRestoreListenerTest.java | 232 +++++++++++++++++++
 .../processor/internals/StandbyTaskTest.java    |   4 +-
 .../processor/internals/StateRestorerTest.java  |  22 +-
 .../internals/StoreChangelogReaderTest.java     | 117 ++++++++--
 .../processor/internals/StreamTaskTest.java     |   5 +-
 .../processor/internals/StreamThreadTest.java   |   8 +-
 ...WrappedBatchingStateRestoreCallbackTest.java |  51 ++++
 .../state/internals/RocksDBStoreTest.java       |  82 ++++++-
 .../StreamThreadStateStoreProviderTest.java     |   3 +-
 .../apache/kafka/test/MockProcessorContext.java |  38 ++-
 .../kafka/test/MockStateRestoreListener.java    |  90 +++++++
 .../kafka/test/ProcessorTopologyTestDriver.java |   5 +-
 27 files changed, 1233 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ae5301a..0d88efb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -143,6 +144,7 @@ public class KafkaStreams {
     private final String logPrefix;
     private final StreamsMetadataState streamsMetadataState;
     private final StreamsConfig config;
+    private StateRestoreListener stateRestoreListener;
     private final StateDirectory stateDirectory;
 
     // container states
@@ -778,6 +780,24 @@ public class KafkaStreams {
     }
 
     /**
+     * Set the listener which is triggered whenever a {@link StateStore} is being restored in order to resume
+     * processing.
+     *
+     * @param globalStateRestoreListener The listener triggered when {@link StateStore} is being restored.
+     */
+    public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) {
+        synchronized (stateLock) {
+            if (state == State.CREATED) {
+                for (StreamThread thread : threads) {
+                    thread.setGlobalStateRestoreListener(globalStateRestoreListener);
+                }
+            } else {
+                throw new IllegalStateException("Can only set the GlobalRestoreListener in the CREATED state");
+            }
+        }
+    }
+
+    /**
      * Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
      * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
      * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java
new file mode 100644
index 0000000..7b5b5d0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Abstract implementation of the  {@link BatchingStateRestoreCallback} used for batch restoration operations.
+ *
+ * Includes default no-op methods of the {@link StateRestoreListener} {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)},
+ * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}, and {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}.
+ */
+public abstract class AbstractNotifyingBatchingRestoreCallback implements BatchingStateRestoreCallback, StateRestoreListener {
+
+    /**
+     * Single put restore operations not supported, please use {@link AbstractNotifyingRestoreCallback}
+     * or {@link StateRestoreCallback} instead for single action restores.
+     */
+    @Override
+    public void restore(final byte[] key,
+                        final byte[] value) {
+        throw new UnsupportedOperationException("Single restore not supported");
+    }
+
+
+    /**
+     * @see StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)
+     *
+     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
+     *
+     */
+    @Override
+    public void onRestoreStart(final TopicPartition topicPartition,
+                               final String storeName,
+                               final long startingOffset,
+                               final long endingOffset) {
+
+    }
+
+
+    /**
+     * @see StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)
+     *
+     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
+     *
+     */
+    @Override
+    public void onBatchRestored(final TopicPartition topicPartition,
+                                final String storeName,
+                                final long batchEndOffset,
+                                final long numRestored) {
+
+    }
+
+    /**
+     * @see StateRestoreListener#onRestoreEnd(TopicPartition, String, long)
+     *
+     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
+     *
+     */
+    @Override
+    public void onRestoreEnd(final TopicPartition topicPartition,
+                             final String storeName,
+                             final long totalRestored) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java
new file mode 100644
index 0000000..2eb3f66
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Abstract implementation of the  {@link StateRestoreCallback} used for batch restoration operations.
+ *
+ * Includes default no-op methods of the {@link StateRestoreListener} {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)},
+ * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}, and {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}.
+ */
+public abstract class AbstractNotifyingRestoreCallback implements StateRestoreCallback, StateRestoreListener {
+
+
+    /**
+     * @see StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)
+     *
+     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
+     *
+     */
+    @Override
+    public void onRestoreStart(final TopicPartition topicPartition,
+                               final String storeName,
+                               final long startingOffset,
+                               final long endingOffset) {
+
+    }
+
+
+    /**
+     * @see StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)
+     *
+     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
+     *
+     */
+    @Override
+    public void onBatchRestored(final TopicPartition topicPartition,
+                                final String storeName,
+                                final long batchEndOffset,
+                                final long numRestored) {
+
+    }
+
+    /**
+     * @see StateRestoreListener#onRestoreEnd(TopicPartition, String, long)
+     *
+     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
+     *
+     */
+    @Override
+    public void onRestoreEnd(final TopicPartition topicPartition,
+                             final String storeName,
+                             final long totalRestored) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java
new file mode 100644
index 0000000..3447a57
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Collection;
+
+/**
+ * Interface for batching restoration of a {@link StateStore}
+ *
+ * It is expected that implementations of this class will not call the {@link StateRestoreCallback#restore(byte[],
+ * byte[])} method.
+ */
+public interface BatchingStateRestoreCallback extends StateRestoreCallback {
+
+    /**
+     * Called to restore a number of records.  This method is called repeatedly until the {@link StateStore} is fulled
+     * restored.
+     *
+     * @param records the records to restore.
+     */
+    void restoreAll(Collection<KeyValue<byte[], byte[]>> records);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
new file mode 100644
index 0000000..c80a736
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Class for listening to various states of the restoration process of a StateStore.
+ *
+ * When calling {@link org.apache.kafka.streams.KafkaStreams#setGlobalStateRestoreListener(StateRestoreListener)}
+ * the passed instance is expected to be stateless since the {@code StateRestoreListener} is shared
+ * across all {@link org.apache.kafka.streams.processor.internals.StreamThread} instances.
+ *
+ * Users desiring stateful operations will need to provide synchronization internally in
+ * the {@code StateRestorerListener} implementation.
+ *
+ * When used for monitoring a single {@link StateStore} using either {@link AbstractNotifyingRestoreCallback} or
+ * {@link AbstractNotifyingBatchingRestoreCallback} no synchronization is necessary
+ * as each StreamThread has its own StateStore instance.
+ *
+ * Incremental updates are exposed so users can estimate how much progress has been made.
+ */
+public interface StateRestoreListener {
+
+    /**
+     * Method called at the very beginning of {@link StateStore} restoration.
+     *
+     * @param topicPartition the TopicPartition containing the values to restore
+     * @param storeName      the name of the store undergoing restoration
+     * @param startingOffset the starting offset of the entire restoration process for this TopicPartition
+     * @param endingOffset   the ending offset of the entire restoration process for this TopicPartition
+     */
+    void onRestoreStart(final TopicPartition topicPartition,
+                        final String storeName,
+                        final long startingOffset,
+                        final long endingOffset);
+
+    /**
+     * Method called after restoring a batch of records.  In this case the maximum size of the batch is whatever
+     * the value of the MAX_POLL_RECORDS is set to.
+     *
+     * This method is called after restoring each batch and it is advised to keep processing to a minimum.
+     * Any heavy processing will hold up recovering the next batch, hence slowing down the restore process as a
+     * whole.
+     *
+     * If you need to do any extended processing or connecting to an external service consider doing so asynchronously.
+     *
+     * @param topicPartition the TopicPartition containing the values to restore
+     * @param storeName the name of the store undergoing restoration
+     * @param batchEndOffset the ending offset for the current restored batch for this TopicPartition
+     * @param numRestored the total number of records restored in this batch for this TopicPartition
+     */
+    void onBatchRestored(final TopicPartition topicPartition,
+                         final String storeName,
+                         final long batchEndOffset,
+                         final long numRestored);
+
+    /**
+     * Method called when restoring the {@link StateStore} is complete.
+     *
+     * @param topicPartition the TopicPartition containing the values to restore
+     * @param storeName the name of the store just restored
+     * @param totalRestored the total number of records restored for this TopicPartition
+     */
+    void onRestoreEnd(final TopicPartition topicPartition,
+                      final String storeName,
+                      final long totalRestored);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
new file mode 100644
index 0000000..138be77
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+
+import java.util.Collection;
+
+public class CompositeRestoreListener implements BatchingStateRestoreCallback, StateRestoreListener {
+
+    public static final NoOpStateRestoreListener NO_OP_STATE_RESTORE_LISTENER = new NoOpStateRestoreListener();
+    private final BatchingStateRestoreCallback internalBatchingRestoreCallback;
+    private final StateRestoreListener storeRestoreListener;
+    private StateRestoreListener globalRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
+
+    CompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) {
+
+        if (stateRestoreCallback instanceof StateRestoreListener) {
+            storeRestoreListener = (StateRestoreListener) stateRestoreCallback;
+        } else {
+            storeRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
+        }
+
+        internalBatchingRestoreCallback = getBatchingRestoreCallback(stateRestoreCallback);
+    }
+
+    @Override
+    public void onRestoreStart(final TopicPartition topicPartition,
+                               final String storeName,
+                               final long startingOffset,
+                               final long endingOffset) {
+        globalRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+        storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+    }
+
+    @Override
+    public void onBatchRestored(final TopicPartition topicPartition,
+                                final String storeName,
+                                final long batchEndOffset,
+                                final long numRestored) {
+        globalRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+        storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+    }
+
+    @Override
+    public void onRestoreEnd(final TopicPartition topicPartition,
+                             final String storeName,
+                             final long totalRestored) {
+        globalRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+        storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+
+    }
+
+    @Override
+    public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
+        internalBatchingRestoreCallback.restoreAll(records);
+    }
+
+    void setGlobalRestoreListener(final StateRestoreListener globalRestoreListener) {
+        if (globalRestoreListener != null) {
+            this.globalRestoreListener = globalRestoreListener;
+        }
+    }
+
+    @Override
+    public void restore(final byte[] key,
+                        final byte[] value) {
+        throw new UnsupportedOperationException("Single restore functionality shouldn't be called directly but "
+                                                + "through the delegated StateRestoreCallback instance");
+    }
+
+    private BatchingStateRestoreCallback getBatchingRestoreCallback(StateRestoreCallback restoreCallback) {
+        if (restoreCallback instanceof  BatchingStateRestoreCallback) {
+            return (BatchingStateRestoreCallback) restoreCallback;
+        }
+
+        return new WrappedBatchingStateRestoreCallback(restoreCallback);
+    }
+
+
+    private static final class NoOpStateRestoreListener extends AbstractNotifyingBatchingRestoreCallback {
+
+        @Override
+        public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 1b3e036..d9205a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -21,9 +21,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -169,15 +171,23 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
 
             long offset = consumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
+            BatchingStateRestoreCallback
+                stateRestoreAdapter =
+                (BatchingStateRestoreCallback) ((stateRestoreCallback instanceof
+                                                     BatchingStateRestoreCallback)
+                                                ? stateRestoreCallback
+                                                : new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
 
             while (offset < highWatermark) {
                 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+                final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
                 for (ConsumerRecord<byte[], byte[]> record : records) {
                     offset = record.offset() + 1;
                     if (record.key() != null) {
-                        stateRestoreCallback.restore(record.key(), record.value());
+                        restoreRecords.add(KeyValue.pair(record.key(), record.value()));
                     }
                 }
+                stateRestoreAdapter.restoreAll(restoreRecords);
             }
             checkpointableOffsets.put(topicPartition, offset);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index b1dcdf7..9f45ddd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -171,10 +171,11 @@ public class ProcessorStateManager implements StateManager {
         } else {
             log.trace("{} Restoring state store {} from changelog topic {}", logPrefix, store.name(), topic);
             final StateRestorer restorer = new StateRestorer(storePartition,
-                                                             stateRestoreCallback,
+                                                             new CompositeRestoreListener(stateRestoreCallback),
                                                              checkpointedOffsets.get(storePartition),
                                                              offsetLimit(storePartition),
-                                                             store.persistent());
+                                                             store.persistent(),
+                                                             store.name());
             changelogReader.register(restorer);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 79bfd1d..9076572 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -17,30 +17,36 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+
+import java.util.Collection;
 
 public class StateRestorer {
+
     static final int NO_CHECKPOINT = -1;
 
     private final Long checkpoint;
     private final long offsetLimit;
     private final boolean persistent;
     private final TopicPartition partition;
-    private final StateRestoreCallback stateRestoreCallback;
-
+    private final String storeName;
+    private final CompositeRestoreListener compositeRestoreListener;
     private long restoredOffset;
     private long startingOffset;
 
     StateRestorer(final TopicPartition partition,
-                  final StateRestoreCallback stateRestoreCallback,
+                  final CompositeRestoreListener compositeRestoreListener,
                   final Long checkpoint,
                   final long offsetLimit,
-                  final boolean persistent) {
+                  final boolean persistent,
+                  final String storeName) {
         this.partition = partition;
-        this.stateRestoreCallback = stateRestoreCallback;
+        this.compositeRestoreListener = compositeRestoreListener;
         this.checkpoint = checkpoint;
         this.offsetLimit = offsetLimit;
         this.persistent = persistent;
+        this.storeName = storeName;
     }
 
     public TopicPartition partition() {
@@ -51,14 +57,30 @@ public class StateRestorer {
         return checkpoint == null ? NO_CHECKPOINT : checkpoint;
     }
 
-    void restore(final byte[] key, final byte[] value) {
-        stateRestoreCallback.restore(key, value);
+    void restoreStarted(long startingOffset, long endingOffset) {
+        compositeRestoreListener.onRestoreStart(partition, storeName, startingOffset, endingOffset);
+    }
+
+    void restoreDone() {
+        compositeRestoreListener.onRestoreEnd(partition, storeName, restoredNumRecords());
+    }
+
+    void restoreBatchCompleted(long currentRestoredOffset, int numRestored) {
+        compositeRestoreListener.onBatchRestored(partition, storeName, currentRestoredOffset, numRestored);
+    }
+
+    void restore(final Collection<KeyValue<byte[], byte[]>> records) {
+        compositeRestoreListener.restoreAll(records);
     }
 
     boolean isPersistent() {
         return persistent;
     }
 
+    void setGlobalRestoreListener(StateRestoreListener globalStateRestoreListener) {
+        this.compositeRestoreListener.setGlobalRestoreListener(globalStateRestoreListener);
+    }
+
     void setRestoredOffset(final long restoredOffset) {
         this.restoredOffset = Math.min(offsetLimit, restoredOffset);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 3d5a793..842721d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -23,7 +23,9 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,17 +47,21 @@ public class StoreChangelogReader implements ChangelogReader {
     private final long partitionValidationTimeoutMs;
     private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
+    private final StateRestoreListener stateRestoreListener;
 
-    public StoreChangelogReader(final String threadId, final Consumer<byte[], byte[]> consumer, final Time time, final long partitionValidationTimeoutMs) {
+    public StoreChangelogReader(final String threadId, final Consumer<byte[], byte[]> consumer, final Time time,
+                                final long partitionValidationTimeoutMs, final StateRestoreListener stateRestoreListener) {
         this.time = time;
         this.consumer = consumer;
         this.partitionValidationTimeoutMs = partitionValidationTimeoutMs;
 
         this.logPrefix = String.format("stream-thread [%s]", threadId);
+        this.stateRestoreListener = stateRestoreListener;
     }
 
-    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final Time time, final long partitionValidationTimeoutMs) {
-        this("", consumer, time, partitionValidationTimeoutMs);
+    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final Time time,
+                                long partitionValidationTimeoutMs, final StateRestoreListener stateRestoreListener) {
+        this("", consumer, time, partitionValidationTimeoutMs, stateRestoreListener);
     }
 
     @Override
@@ -93,6 +99,7 @@ public class StoreChangelogReader implements ChangelogReader {
     @Override
     public void register(final StateRestorer restorer) {
         if (restorer.offsetLimit() > 0) {
+            restorer.setGlobalRestoreListener(stateRestoreListener);
             stateRestorers.put(restorer.partition(), restorer);
         }
     }
@@ -115,6 +122,8 @@ public class StoreChangelogReader implements ChangelogReader {
                     restorer.setRestoredOffset(restorer.checkpoint());
                 } else {
                     needsRestoring.put(topicPartition, restorer);
+                    final Long endOffset = endOffsets.get(topicPartition);
+                    restorer.restoreStarted(restorer.startingOffset(), endOffset);
                 }
             }
 
@@ -202,21 +211,39 @@ public class StoreChangelogReader implements ChangelogReader {
                     restorer.startingOffset(),
                     restorer.restoredOffset());
 
+            restorer.restoreDone();
+
             partitionIterator.remove();
         }
     }
 
-    private long processNext(final List<ConsumerRecord<byte[], byte[]>> records, final StateRestorer restorer, final Long endOffset) {
+    private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
+                             final StateRestorer restorer, final Long endOffset) {
+        final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
+        long nextPosition = -1;
+
         for (final ConsumerRecord<byte[], byte[]> record : records) {
             final long offset = record.offset();
             if (restorer.hasCompleted(offset, endOffset)) {
-                return offset;
+                nextPosition = record.offset();
+                break;
             }
             if (record.key() != null) {
-                restorer.restore(record.key(), record.value());
+                restoreRecords.add(KeyValue.pair(record.key(), record.value()));
             }
         }
-        return consumer.position(restorer.partition());
+
+        if (nextPosition == -1) {
+            nextPosition = consumer.position(restorer.partition());
+        }
+
+        if (!restoreRecords.isEmpty()) {
+            restorer.restore(restoreRecords);
+            restorer.restoreBatchCompleted(nextPosition, records.size());
+
+        }
+
+        return nextPosition;
     }
 
     private boolean hasPartition(final TopicPartition topicPartition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3bafcd0..b3425db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -44,6 +44,8 @@ import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -183,7 +185,9 @@ public class StreamThread extends Thread {
 
             final long start = time.milliseconds();
             try {
-                storeChangelogReader = new StoreChangelogReader(getName(), restoreConsumer, time, requestTimeOut);
+                storeChangelogReader =
+                    new StoreChangelogReader(getName(), restoreConsumer, time, requestTimeOut,
+                                             globalStateRestoreListener);
                 setState(State.ASSIGNING_PARTITIONS);
                 // do this first as we may have suspended standby tasks that
                 // will become active or vice versa
@@ -438,6 +442,7 @@ public class StreamThread extends Thread {
     private final TaskCreator taskCreator = new TaskCreator();
 
     final ConsumerRebalanceListener rebalanceListener;
+    private StateRestoreListener globalStateRestoreListener;
     private final static int UNLIMITED_RECORDS = -1;
 
     public StreamThread(final InternalTopologyBuilder builder,
@@ -996,6 +1001,16 @@ public class StreamThread extends Thread {
     }
 
     /**
+     * Set the listener invoked at the beginning, end of batch updates and the conclusion of
+     * restoring a {@link StateStore}.
+     *
+     * @param globalStateRestoreListener  listener for capturing state store restoration status.
+     */
+    public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) {
+        this.globalStateRestoreListener = globalStateRestoreListener;
+    }
+
+    /**
      * @return The state this instance is in
      */
     public State state() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallback.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallback.java
new file mode 100644
index 0000000..02814d1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+
+import java.util.Collection;
+
+public class WrappedBatchingStateRestoreCallback implements BatchingStateRestoreCallback {
+
+    private final StateRestoreCallback stateRestoreCallback;
+
+    public WrappedBatchingStateRestoreCallback(final StateRestoreCallback stateRestoreCallback) {
+        this.stateRestoreCallback = stateRestoreCallback;
+    }
+
+    @Override
+    public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
+        for (KeyValue<byte[], byte[]> record : records) {
+            restore(record.key, record.value);
+        }
+    }
+
+    @Override
+    public void restore(final byte[] key,
+                        final byte[] value) {
+        stateRestoreCallback.restore(key, value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 4d93a9a..e68db14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
@@ -24,8 +25,8 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -47,14 +48,15 @@ import org.rocksdb.WriteOptions;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * A persistent key-value store based on RocksDB.
@@ -97,6 +99,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private WriteOptions wOptions;
     private FlushOptions fOptions;
 
+    private volatile boolean prepareForBulkload = false;
+    private ProcessorContext internalProcessorContext;
+
     protected volatile boolean open = false;
 
     RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
@@ -135,6 +140,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         // (this could be a bug in the RocksDB code and their devs have been contacted).
         options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2));
 
+        if (prepareForBulkload) {
+            options.prepareForBulkLoad();
+        }
+
         wOptions = new WriteOptions();
         wOptions.setDisableWAL(true);
 
@@ -169,14 +178,27 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     public void init(ProcessorContext context, StateStore root) {
         // open the DB dir
+        this.internalProcessorContext = context;
         openDB(context);
 
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
-        context.register(root, false, new StateRestoreCallback() {
+        context.register(root, false, new AbstractNotifyingBatchingRestoreCallback() {
+            @Override
+            public void restoreAll(Collection<KeyValue<byte[], byte[]>> records) {
+                restoreAllInternal(records);
+            }
+
+            @Override
+            public void onRestoreStart(TopicPartition topicPartition, String storeName,
+                                       long startingOffset, long endingOffset) {
+                toggleDbForBulkLoading(true);
+            }
+
             @Override
-            public void restore(byte[] key, byte[] value) {
-                putInternal(key, value);
+            public void onRestoreEnd(TopicPartition topicPartition, String storeName,
+                                     long totalRestored) {
+                toggleDbForBulkLoading(false);
             }
         });
 
@@ -198,6 +220,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
+    // visible for testing
+    boolean isPrepareForBulkload() {
+        return prepareForBulkload;
+    }
+
     @Override
     public String name() {
         return this.name;
@@ -239,6 +266,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
+    private void toggleDbForBulkLoading(boolean prepareForBulkload) {
+        close();
+        this.prepareForBulkload = prepareForBulkload;
+        openDB(internalProcessorContext);
+        open = true;
+    }
+
+    @SuppressWarnings("unchecked")
     @Override
     public synchronized void put(K key, V value) {
         Objects.requireNonNull(key, "key cannot be null");
@@ -258,6 +293,17 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         return originalValue;
     }
 
+    private void restoreAllInternal(Collection<KeyValue<byte[], byte[]>> records) {
+        try (WriteBatch batch = new WriteBatch()) {
+            for (KeyValue<byte[], byte[]> record : records) {
+                batch.put(record.key, record.value);
+            }
+            db.write(wOptions, batch);
+        } catch (RocksDBException e) {
+            throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
+        }
+    }
+
     private void putInternal(byte[] rawKey, byte[] rawValue) {
         if (rawValue == null) {
             try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 3f60d69..a6532df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
@@ -257,6 +258,19 @@ public class KafkaStreamsTest {
     }
 
     @Test
+    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
+        streams.start();
+        try {
+            streams.setGlobalStateRestoreListener(new MockStateRestoreListener());
+            fail("Should throw an IllegalStateException");
+        } catch (final IllegalStateException e) {
+            Assert.assertEquals("Can only set the GlobalRestoreListener in the CREATED state", e.getMessage());
+        } finally {
+            streams.close();
+        }
+    }
+
+    @Test
     public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
         streams.start();
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index ba3230a..4f67555 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
@@ -77,7 +78,7 @@ public class AbstractTaskTest {
                                                       Collections.<String, String>emptyMap(),
                                                       Collections.<StateStore>emptyList()),
                                 consumer,
-                                new StoreChangelogReader(consumer, Time.SYSTEM, 5000),
+                                new StoreChangelogReader(consumer, Time.SYSTEM, 5000, new MockStateRestoreListener()),
                                 false,
                                 new StateDirectory("app", TestUtils.tempDirectory().getPath(), time),
                                 new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())),

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
new file mode 100644
index 0000000..4a7877a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.test.MockStateRestoreListener;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+public class CompositeRestoreListenerTest {
+
+    private final MockStateRestoreCallback stateRestoreCallback = new MockStateRestoreCallback();
+    private final MockBatchingStateRestoreCallback
+        batchingStateRestoreCallback =
+        new MockBatchingStateRestoreCallback();
+    private final MockNoListenBatchingStateRestoreCallback
+        noListenBatchingStateRestoreCallback =
+        new MockNoListenBatchingStateRestoreCallback();
+    private final MockStateRestoreListener reportingStoreListener = new MockStateRestoreListener();
+    private final byte[] key = "key".getBytes(Charset.forName("UTF-8"));
+    private final byte[] value = "value".getBytes(Charset.forName("UTF-8"));
+    private final Collection<KeyValue<byte[], byte[]>> records = Collections.singletonList(KeyValue.pair(key, value));
+    private final String storeName = "test_store";
+    private final long startOffset = 0L;
+    private final long endOffset = 1L;
+    private final long batchOffset = 1L;
+    private final long numberRestored = 1L;
+    private final TopicPartition topicPartition = new TopicPartition("testTopic", 1);
+
+    private CompositeRestoreListener compositeRestoreListener;
+
+
+    @Test
+    public void shouldRestoreInNonBatchMode() {
+        setUpCompositeRestoreListener(stateRestoreCallback);
+        compositeRestoreListener.restoreAll(records);
+        assertThat(stateRestoreCallback.restoredKey, is(key));
+        assertThat(stateRestoreCallback.restoredValue, is(value));
+    }
+
+    @Test
+    public void shouldRestoreInBatchMode() {
+        setUpCompositeRestoreListener(batchingStateRestoreCallback);
+        compositeRestoreListener.restoreAll(records);
+        assertThat(batchingStateRestoreCallback.restoredRecords, is(records));
+    }
+
+    @Test
+    public void shouldNotifyRestoreStartNonBatchMode() {
+        setUpCompositeRestoreListener(stateRestoreCallback);
+        compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
+        assertStateRestoreListenerOnStartNotification(stateRestoreCallback);
+        assertStateRestoreListenerOnStartNotification(reportingStoreListener);
+    }
+
+    @Test
+    public void shouldNotifyRestoreStartBatchMode() {
+        setUpCompositeRestoreListener(batchingStateRestoreCallback);
+        compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
+        assertStateRestoreListenerOnStartNotification(batchingStateRestoreCallback);
+        assertStateRestoreListenerOnStartNotification(reportingStoreListener);
+    }
+
+    @Test
+    public void shouldNotifyRestoreProgressNonBatchMode() {
+        setUpCompositeRestoreListener(stateRestoreCallback);
+        compositeRestoreListener.onBatchRestored(topicPartition, storeName, endOffset, numberRestored);
+        assertStateRestoreListenerOnBatchCompleteNotification(stateRestoreCallback);
+        assertStateRestoreListenerOnBatchCompleteNotification(reportingStoreListener);
+    }
+
+    @Test
+    public void shouldNotifyRestoreProgressBatchMode() {
+        setUpCompositeRestoreListener(batchingStateRestoreCallback);
+        compositeRestoreListener.onBatchRestored(topicPartition, storeName, endOffset, numberRestored);
+        assertStateRestoreListenerOnBatchCompleteNotification(batchingStateRestoreCallback);
+        assertStateRestoreListenerOnBatchCompleteNotification(reportingStoreListener);
+    }
+
+    @Test
+    public void shouldNotifyRestoreEndInNonBatchMode() {
+        setUpCompositeRestoreListener(stateRestoreCallback);
+        compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
+        assertStateRestoreOnEndNotification(stateRestoreCallback);
+        assertStateRestoreOnEndNotification(reportingStoreListener);
+    }
+
+    @Test
+    public void shouldNotifyRestoreEndInBatchMode() {
+        setUpCompositeRestoreListener(batchingStateRestoreCallback);
+        compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
+        assertStateRestoreOnEndNotification(batchingStateRestoreCallback);
+        assertStateRestoreOnEndNotification(reportingStoreListener);
+    }
+
+    @Test
+    public void shouldHandleNullReportStoreListener() {
+        compositeRestoreListener = new CompositeRestoreListener(batchingStateRestoreCallback);
+        compositeRestoreListener.setGlobalRestoreListener(null);
+
+        compositeRestoreListener.restoreAll(records);
+        compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
+        compositeRestoreListener.onBatchRestored(topicPartition, storeName, batchOffset, numberRestored);
+        compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
+
+        assertThat(batchingStateRestoreCallback.restoredRecords, is(records));
+        assertStateRestoreOnEndNotification(batchingStateRestoreCallback);
+    }
+
+    @Test
+    public void shouldHandleNoRestoreListener() {
+        compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
+        compositeRestoreListener.setGlobalRestoreListener(null);
+
+        compositeRestoreListener.restoreAll(records);
+        compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
+        compositeRestoreListener.onBatchRestored(topicPartition, storeName, batchOffset, numberRestored);
+        compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
+
+        assertThat(noListenBatchingStateRestoreCallback.restoredRecords, is(records));
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowExceptionWhenSinglePutDirectlyCalled() {
+        compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
+        compositeRestoreListener.setGlobalRestoreListener(null);
+
+        compositeRestoreListener.restore(key, value);
+    }
+
+    private void assertStateRestoreListenerOnStartNotification(MockStateRestoreListener restoreListener) {
+        assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_START));
+        assertThat(restoreListener.restoreTopicPartition, is(topicPartition));
+        assertThat(restoreListener.restoreStartOffset, is(startOffset));
+        assertThat(restoreListener.restoreEndOffset, is(endOffset));
+    }
+
+    private void assertStateRestoreListenerOnBatchCompleteNotification(MockStateRestoreListener restoreListener) {
+        assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_BATCH));
+        assertThat(restoreListener.restoreTopicPartition, is(topicPartition));
+        assertThat(restoreListener.restoredBatchOffset, is(batchOffset));
+        assertThat(restoreListener.numBatchRestored, is(numberRestored));
+    }
+
+    private void assertStateRestoreOnEndNotification(MockStateRestoreListener restoreListener) {
+        assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_END));
+        assertThat(restoreListener.restoreTopicPartition, is(topicPartition));
+        assertThat(restoreListener.totalNumRestored, is(numberRestored));
+    }
+
+
+    private void setUpCompositeRestoreListener(StateRestoreCallback stateRestoreCallback) {
+        compositeRestoreListener = new CompositeRestoreListener(stateRestoreCallback);
+        compositeRestoreListener.setGlobalRestoreListener(reportingStoreListener);
+    }
+
+
+    private static class MockStateRestoreCallback extends MockStateRestoreListener implements StateRestoreCallback {
+
+        byte[] restoredKey;
+        byte[] restoredValue;
+
+        @Override
+        public void restore(final byte[] key, final byte[] value) {
+            restoredKey = key;
+            restoredValue = value;
+        }
+    }
+
+    private static class MockBatchingStateRestoreCallback extends MockStateRestoreListener
+        implements BatchingStateRestoreCallback {
+
+        Collection<KeyValue<byte[], byte[]>> restoredRecords;
+
+        @Override
+        public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
+            restoredRecords = records;
+        }
+
+        @Override
+        public void restore(final byte[] key, final byte[] value) {
+            throw new IllegalStateException("Should not be called");
+
+        }
+    }
+
+    private static class MockNoListenBatchingStateRestoreCallback implements BatchingStateRestoreCallback {
+
+        Collection<KeyValue<byte[], byte[]>> restoredRecords;
+
+        @Override
+        public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
+            restoredRecords = records;
+        }
+
+        @Override
+        public void restore(final byte[] key, final byte[] value) {
+            throw new IllegalStateException("Should not be called");
+
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 25c3cbd..9ac2f99 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockRestoreConsumer;
+import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
@@ -76,6 +77,7 @@ public class StandbyTaskTest {
 
     private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1, 1);
     private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2, 1);
+    private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
 
     private final Set<TopicPartition> topicPartitions = Collections.emptySet();
     private final ProcessorTopology topology = new ProcessorTopology(
@@ -126,7 +128,7 @@ public class StandbyTaskTest {
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, Time.SYSTEM, 5000);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, Time.SYSTEM, 5000, stateRestoreListener);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index 6968f33..8abce3b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -17,9 +17,14 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.test.MockRestoreCallback;
+import org.apache.kafka.test.MockStateRestoreListener;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
+
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -28,11 +33,19 @@ public class StateRestorerTest {
 
     private static final long OFFSET_LIMIT = 50;
     private final MockRestoreCallback callback = new MockRestoreCallback();
-    private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true);
+    private final MockStateRestoreListener reportingListener = new MockStateRestoreListener();
+    private final CompositeRestoreListener compositeRestoreListener = new CompositeRestoreListener(callback);
+    private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), compositeRestoreListener,
+                                                             null, OFFSET_LIMIT, true, "storeName");
+
+    @Before
+    public void setUp() {
+        compositeRestoreListener.setGlobalRestoreListener(reportingListener);
+    }
 
     @Test
     public void shouldCallRestoreOnRestoreCallback() throws Exception {
-        restorer.restore(new byte[0], new byte[0]);
+        restorer.restore(Collections.singletonList(KeyValue.pair(new byte[0], new byte[0])));
         assertThat(callback.restored.size(), equalTo(1));
     }
 
@@ -53,7 +66,10 @@ public class StateRestorerTest {
 
     @Test
     public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() throws Exception {
-        final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, 0, true);
+        final StateRestorer
+            restorer =
+            new StateRestorer(new TopicPartition("topic", 1), compositeRestoreListener, null, 0, true,
+                              "storeName");
         assertTrue(restorer.hasCompleted(0, 10));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 2ff6d33..be3cb40 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -27,26 +27,39 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.test.MockRestoreCallback;
+import org.apache.kafka.test.MockStateRestoreListener;
 import org.hamcrest.CoreMatchers;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.fail;
 
 public class StoreChangelogReaderTest {
 
-    private final MockRestoreCallback callback = new MockRestoreCallback();
-    private MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-    private StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0);
+    private final MockStateRestoreListener callback = new MockStateRestoreListener();
+    private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback);
+    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0, stateRestoreListener);
     private final TopicPartition topicPartition = new TopicPartition("topic", 0);
     private final PartitionInfo partitionInfo = new PartitionInfo(topicPartition.topic(), 0, null, null, null);
 
+    @Before
+    public void setUp() {
+        restoreListener.setGlobalRestoreListener(stateRestoreListener);
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void shouldThrowStreamsExceptionWhenTimeoutExceptionThrown() throws Exception {
@@ -56,7 +69,7 @@ public class StoreChangelogReaderTest {
                 throw new TimeoutException("KABOOM!");
             }
         };
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0);
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0, stateRestoreListener);
         try {
             changelogReader.validatePartitionExists(topicPartition, "store");
             fail("Should have thrown streams exception");
@@ -80,7 +93,8 @@ public class StoreChangelogReaderTest {
             }
         };
 
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 10);
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new
+            MockTime(), 10, stateRestoreListener);
         changelogReader.validatePartitionExists(topicPartition, "store");
     }
 
@@ -93,7 +107,8 @@ public class StoreChangelogReaderTest {
                 throw new TimeoutException("KABOOM!");
             }
         };
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 5);
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new
+            MockTime(), 5, stateRestoreListener);
         try {
             changelogReader.validatePartitionExists(topicPartition, "store");
             fail("Should have thrown streams exception");
@@ -119,7 +134,7 @@ public class StoreChangelogReaderTest {
         };
 
         consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo));
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Time.SYSTEM, 5000);
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Time.SYSTEM, 5000, stateRestoreListener);
         changelogReader.validatePartitionExists(topicPartition, "store");
     }
 
@@ -139,7 +154,8 @@ public class StoreChangelogReaderTest {
     public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exception {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
+                                                   "storeName"));
 
         changelogReader.restore();
         assertThat(callback.restored.size(), equalTo(messages));
@@ -149,7 +165,8 @@ public class StoreChangelogReaderTest {
     public void shouldRestoreMessagesFromCheckpoint() throws Exception {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, 5L, Long.MAX_VALUE, true));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, 5L, Long.MAX_VALUE, true,
+                                                   "storeName"));
 
         changelogReader.restore();
         assertThat(callback.restored.size(), equalTo(5));
@@ -159,7 +176,8 @@ public class StoreChangelogReaderTest {
     public void shouldClearAssignmentAtEndOfRestore() throws Exception {
         final int messages = 1;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
+                                                   "storeName"));
 
         changelogReader.restore();
         assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet()));
@@ -168,7 +186,8 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldRestoreToLimitWhenSupplied() throws Exception {
         setupConsumer(10, topicPartition);
-        final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, 3, true);
+        final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, 3, true,
+                                                         "storeName");
         changelogReader.register(restorer);
 
         changelogReader.restore();
@@ -182,24 +201,79 @@ public class StoreChangelogReaderTest {
         final TopicPartition two = new TopicPartition("two", 0);
         final MockRestoreCallback callbackOne = new MockRestoreCallback();
         final MockRestoreCallback callbackTwo = new MockRestoreCallback();
+        final CompositeRestoreListener restoreListener1 = new CompositeRestoreListener(callbackOne);
+        final CompositeRestoreListener restoreListener2 = new CompositeRestoreListener(callbackTwo);
+        setupConsumer(10, topicPartition);
+        setupConsumer(5, one);
+        setupConsumer(3, two);
+
+        changelogReader
+            .register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
+        changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
+        changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));
+
+        changelogReader.restore();
+
+        assertThat(callback.restored.size(), equalTo(10));
+        assertThat(callbackOne.restored.size(), equalTo(5));
+        assertThat(callbackTwo.restored.size(), equalTo(3));
+    }
+
+    @Test
+    public void shouldRestoreAndNotifyMultipleStores() throws Exception {
+        final TopicPartition one = new TopicPartition("one", 0);
+        final TopicPartition two = new TopicPartition("two", 0);
+        final MockStateRestoreListener callbackOne = new MockStateRestoreListener();
+        final MockStateRestoreListener callbackTwo = new MockStateRestoreListener();
+        final CompositeRestoreListener restoreListener1 = new CompositeRestoreListener(callbackOne);
+        final CompositeRestoreListener restoreListener2 = new CompositeRestoreListener(callbackTwo);
         setupConsumer(10, topicPartition);
         setupConsumer(5, one);
         setupConsumer(3, two);
 
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true));
-        changelogReader.register(new StateRestorer(one, callbackOne, null, Long.MAX_VALUE, true));
-        changelogReader.register(new StateRestorer(two, callbackTwo, null, Long.MAX_VALUE, true));
+        changelogReader
+            .register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
+        changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
+        changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));
 
         changelogReader.restore();
 
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackOne.restored.size(), equalTo(5));
         assertThat(callbackTwo.restored.size(), equalTo(3));
+
+        assertAllCallbackStatesExecuted(callback, "storeName1");
+        assertCorrectOffsetsReportedByListener(callback, 0L, 10L, 10L);
+
+        assertAllCallbackStatesExecuted(callbackOne, "storeName2");
+        assertCorrectOffsetsReportedByListener(callbackOne, 0L, 5L, 5L);
+
+        assertAllCallbackStatesExecuted(callbackTwo, "storeName3");
+        assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 3L, 3L);
+    }
+
+    private void assertAllCallbackStatesExecuted(final MockStateRestoreListener restoreListener,
+                                                 final String storeName) {
+        assertThat(restoreListener.storeNameCalledStates.get(RESTORE_START), equalTo(storeName));
+        assertThat(restoreListener.storeNameCalledStates.get(RESTORE_BATCH), equalTo(storeName));
+        assertThat(restoreListener.storeNameCalledStates.get(RESTORE_END), equalTo(storeName));
+    }
+
+
+    private void assertCorrectOffsetsReportedByListener(final MockStateRestoreListener restoreListener,
+                                                        long startOffset,
+                                                        final long batchOffset, final long endOffset) {
+
+        assertThat(restoreListener.restoreStartOffset, equalTo(startOffset));
+        assertThat(restoreListener.restoredBatchOffset, equalTo(batchOffset));
+        assertThat(restoreListener.restoreEndOffset, equalTo(endOffset));
     }
 
     @Test
     public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception {
-        final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true);
+        final StateRestorer
+            restorer =
+            new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName");
         setupConsumer(0, topicPartition);
         changelogReader.register(restorer);
 
@@ -212,7 +286,9 @@ public class StoreChangelogReaderTest {
     public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception {
         final Long endOffset = 10L;
         setupConsumer(endOffset, topicPartition);
-        final StateRestorer restorer = new StateRestorer(topicPartition, callback, endOffset, Long.MAX_VALUE, true);
+        final StateRestorer
+            restorer =
+            new StateRestorer(topicPartition, restoreListener, endOffset, Long.MAX_VALUE, true, "storeName");
 
         changelogReader.register(restorer);
 
@@ -224,7 +300,8 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldReturnRestoredOffsetsForPersistentStores() throws Exception {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
+                                                   "storeName"));
         changelogReader.restore();
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L)));
@@ -233,7 +310,8 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() throws Exception {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false,
+                                                   "storeName"));
         changelogReader.restore();
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
@@ -247,7 +325,8 @@ public class StoreChangelogReaderTest {
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 1, (byte[]) null, bytes));
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 2, bytes, bytes));
         consumer.assign(Collections.singletonList(topicPartition));
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false,
+                                                   "storeName"));
         changelogReader.restore();
 
         assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), KeyValue.pair(bytes, bytes))));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a27fb62..e25da88 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -42,12 +42,14 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
+import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -107,7 +109,8 @@ public class StreamTaskTest {
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, Time.SYSTEM, 5000);
+    private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, Time.SYSTEM, 5000, stateRestoreListener);
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
     private final String applicationId = "applicationId";

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e808eb4..29edf6a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
@@ -43,6 +44,7 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestCondition;
@@ -175,6 +177,7 @@ public class StreamThreadTest {
         private boolean suspended;
         private boolean closed;
         private boolean closedStateManager;
+        private static StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
 
         TestStreamTask(final TaskId id,
                        final String applicationId,
@@ -191,7 +194,7 @@ public class StreamThreadTest {
                 partitions,
                 topology,
                 consumer,
-                new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000),
+                new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000, stateRestoreListener),
                 config,
                 metrics,
                 stateDirectory,
@@ -1867,7 +1870,8 @@ public class StreamThreadTest {
                     partitions,
                     builder.build(0),
                     clientSupplier.consumer,
-                    new StoreChangelogReader(getName(), clientSupplier.restoreConsumer, mockTime, 1000),
+                    new StoreChangelogReader(getName(), clientSupplier.restoreConsumer, mockTime, 1000,
+                                             new MockStateRestoreListener()),
                     StreamThreadTest.this.config,
                     new StreamsMetricsImpl(new Metrics(), "groupName", Collections.<String, String>emptyMap()),
                     stateDirectory) {


Mime
View raw message