kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix standby streamTime (#5288)
Date Tue, 03 Jul 2018 14:08:04 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e38e3a6  MINOR: Fix standby streamTime (#5288)
e38e3a6 is described below

commit e38e3a66ab099996ecb156ec9105869f3d9b9228
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Tue Jul 3 09:07:50 2018 -0500

    MINOR: Fix standby streamTime (#5288)
    
    #5253 broke standby restoration for windowed stores.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../processor/internals/ProcessorStateManager.java |  30 +-
 .../processor/internals/StandbyContextImpl.java    |   7 +-
 .../streams/processor/internals/StandbyTask.java   |  27 +-
 .../streams/processor/internals/StreamThread.java  |   4 +-
 .../integration/RestoreIntegrationTest.java        |   4 +-
 .../internals/ProcessorStateManagerTest.java       |  25 +-
 .../processor/internals/StandbyTaskTest.java       | 472 ++++++++++++++-------
 .../org/apache/kafka/test/MockRestoreConsumer.java |  21 +-
 .../kafkatest/tests/streams/streams_smoke_test.py  |   1 -
 9 files changed, 389 insertions(+), 202 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index afb56c1..76d1a0c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
@@ -173,31 +172,12 @@ public class ProcessorStateManager extends AbstractStateManager {
         return partitionsAndOffsets;
     }
 
-    List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(final TopicPartition storePartition,
-                                                             final List<ConsumerRecord<byte[], byte[]>> records) {
-        final long limit = offsetLimit(storePartition);
-        List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
-        final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
-
+    void updateStandbyStates(final TopicPartition storePartition,
+                             final List<KeyValue<byte[], byte[]>> restoreRecords,
+                             final long lastOffset) {
         // restore states from changelog records
         final BatchingStateRestoreCallback restoreCallback = getBatchingRestoreCallback(restoreCallbacks.get(storePartition.topic()));
 
-        long lastOffset = -1L;
-        int count = 0;
-        for (final ConsumerRecord<byte[], byte[]> record : records) {
-            if (record.offset() < limit) {
-                restoreRecords.add(KeyValue.pair(record.key(), record.value()));
-                lastOffset = record.offset();
-            } else {
-                if (remainingRecords == null) {
-                    remainingRecords = new ArrayList<>(records.size() - count);
-                }
-
-                remainingRecords.add(record);
-            }
-            count++;
-        }
-
         if (!restoreRecords.isEmpty()) {
             try {
                 restoreCallback.restoreAll(restoreRecords);
@@ -208,8 +188,6 @@ public class ProcessorStateManager extends AbstractStateManager {
 
         // record the restored offset for its change log partition
         standbyRestoredOffsets.put(storePartition, lastOffset + 1);
-
-        return remainingRecords;
     }
 
     void putOffsetLimit(final TopicPartition partition, final long limit) {
@@ -217,7 +195,7 @@ public class ProcessorStateManager extends AbstractStateManager {
         offsetLimits.put(partition, limit);
     }
 
-    private long offsetLimit(final TopicPartition partition) {
+    long offsetLimit(final TopicPartition partition) {
         final Long limit = offsetLimits.get(partition);
         return limit != null ? limit : Long.MAX_VALUE;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index acc6ad6..b01fd5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -69,6 +69,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
             return Collections.emptyMap();
         }
     };
+    private long streamTime = TimestampTracker.NOT_KNOWN;
 
     StandbyContextImpl(final TaskId id,
                        final StreamsConfig config,
@@ -216,9 +217,13 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
         throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
     }
 
+    void updateStreamTime(final long streamTime) {
+        this.streamTime = Math.max(this.streamTime, streamTime);
+    }
+
     @Override
     public long streamTime() {
-        throw new RuntimeException("Stream time is not implemented for the standby context.");
+        return streamTime;
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index ddf84fd..72cc629 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,11 +19,13 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,6 +38,7 @@ import java.util.Map;
 public class StandbyTask extends AbstractTask {
 
     private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
+    private final StandbyContextImpl standbyContext;
 
     /**
      * Create {@link StandbyTask} with its assigned partitions
@@ -58,8 +61,7 @@ public class StandbyTask extends AbstractTask {
                 final StateDirectory stateDirectory) {
         super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
 
-        // initialize the topology with its own context
-        processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
+        processorContext = standbyContext = new StandbyContextImpl(id, config, stateMgr, metrics);
     }
 
     @Override
@@ -164,7 +166,26 @@ public class StandbyTask extends AbstractTask {
     public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition,
                                                        final List<ConsumerRecord<byte[], byte[]>> records) {
         log.trace("Updating standby replicas of its state store for partition [{}]", partition);
-        return stateMgr.updateStandbyStates(partition, records);
+        final long limit = stateMgr.offsetLimit(partition);
+
+        long lastOffset = -1L;
+        final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>(records.size());
+        final List<ConsumerRecord<byte[], byte[]>> remainingRecords = new ArrayList<>();
+
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            if (record.offset() < limit) {
+                restoreRecords.add(KeyValue.pair(record.key(), record.value()));
+                lastOffset = record.offset();
+                // ideally, we'd use the stream time at the time of the change logging, but we'll settle for
+                // record timestamp for now.
+                standbyContext.updateStreamTime(record.timestamp());
+            } else {
+                remainingRecords.add(record);
+            }
+        }
+
+        stateMgr.updateStandbyStates(partition, restoreRecords, lastOffset);
+        return remainingRecords;
     }
 
     Map<TopicPartition, Long> checkpointedOffsets() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 74ec9aa..f425bb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1069,7 +1069,7 @@ public class StreamThread extends Thread {
                             }
 
                             remaining = task.update(partition, remaining);
-                            if (remaining != null) {
+                            if (!remaining.isEmpty()) {
                                 remainingStandbyRecords.put(partition, remaining);
                             } else {
                                 restoreConsumer.resume(singleton(partition));
@@ -1106,7 +1106,7 @@ public class StreamThread extends Thread {
                         }
 
                         final List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
-                        if (remaining != null) {
+                        if (!remaining.isEmpty()) {
                             restoreConsumer.pause(singleton(partition));
                             standbyRecords.put(partition, remaining);
                         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index dbf85fa..ea9fbad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -36,6 +35,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -367,7 +367,7 @@ public class RestoreIntegrationTest {
 
         }
     }
-    
+
     private void createStateForRestoration(final String changelogTopic) {
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 1b03cd4..4287c77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -50,6 +50,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.util.Collections.singletonList;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -119,7 +120,11 @@ public class ProcessorStateManagerTest {
 
         try {
             stateMgr.register(persistentStore, batchingRestoreCallback);
-            stateMgr.updateStandbyStates(persistentStorePartition, Collections.singletonList(consumerRecord));
+            stateMgr.updateStandbyStates(
+                persistentStorePartition,
+                singletonList(KeyValue.pair(consumerRecord.key(), consumerRecord.value())),
+                consumerRecord.offset()
+            );
             assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1));
             assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
         } finally {
@@ -137,7 +142,11 @@ public class ProcessorStateManagerTest {
 
         try {
             stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
-            stateMgr.updateStandbyStates(persistentStorePartition, Collections.singletonList(consumerRecord));
+            stateMgr.updateStandbyStates(
+                persistentStorePartition,
+                singletonList(KeyValue.pair(consumerRecord.key(), consumerRecord.value())),
+                consumerRecord.offset()
+            );
             assertThat(persistentStore.keys.size(), is(1));
             assertTrue(persistentStore.keys.contains(intKey));
         } finally {
@@ -400,13 +409,11 @@ public class ProcessorStateManagerTest {
 
         stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
         final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
-        stateMgr.updateStandbyStates(persistentStorePartition,
-                                     Collections.singletonList(
-                                             new ConsumerRecord<>(persistentStorePartition.topic(),
-                                                                  persistentStorePartition.partition(),
-                                                                  888L,
-                                                                  bytes,
-                                                                  bytes)));
+        stateMgr.updateStandbyStates(
+            persistentStorePartition,
+            singletonList(KeyValue.pair(bytes, bytes)),
+            888L
+        );
 
         stateMgr.checkpoint(Collections.emptyMap());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 05d0e3d..8f74cd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -25,19 +25,30 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.WindowKeySchema;
 import org.apache.kafka.test.MockRestoreConsumer;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockStateStore;
@@ -58,15 +69,19 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static java.util.Collections.singleton;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkList;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -89,40 +104,36 @@ public class StandbyTaskTest {
 
     private final Set<TopicPartition> topicPartitions = Collections.emptySet();
     private final ProcessorTopology topology = ProcessorTopology.withLocalStores(
-            Utils.mkList(new MockStoreBuilder(storeName1, false).build(), new MockStoreBuilder(storeName2, true).build()),
-            new HashMap<String, String>() {
-                {
-                    put(storeName1, storeChangelogTopicName1);
-                    put(storeName2, storeChangelogTopicName2);
-                }
-            });
+        mkList(new MockStoreBuilder(storeName1, false).build(), new MockStoreBuilder(storeName2, true).build()),
+        mkMap(
+            mkEntry(storeName1, storeChangelogTopicName1),
+            mkEntry(storeName2, storeChangelogTopicName2)
+        )
+    );
     private final TopicPartition globalTopicPartition = new TopicPartition(globalStoreName, 0);
     private final Set<TopicPartition> ktablePartitions = Utils.mkSet(globalTopicPartition);
     private final ProcessorTopology ktableTopology = ProcessorTopology.withLocalStores(
-            Collections.singletonList(new MockStoreBuilder(globalTopicPartition.topic(), true).withLoggingDisabled().build()),
-            new HashMap<String, String>() {
-                {
-                    put(globalStoreName, globalTopicPartition.topic());
-                }
-            });
+        singletonList(new MockStoreBuilder(globalTopicPartition.topic(), true).withLoggingDisabled().build()),
+        mkMap(
+            mkEntry(globalStoreName, globalTopicPartition.topic())
+        )
+    );
 
     private File baseDir;
     private StateDirectory stateDirectory;
 
     private StreamsConfig createConfig(final File baseDir) throws IOException {
-        return new StreamsConfig(new Properties() {
-            {
-                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-                setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
-            }
-        });
+        return new StreamsConfig(mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
+            mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"),
+            mkEntry(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()),
+            mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName())
+        )));
     }
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-    private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
+    private final MockRestoreConsumer<Integer, Integer> restoreStateConsumer = new MockRestoreConsumer<>(new IntegerSerializer(), new IntegerSerializer());
     private final StoreChangelogReader changelogReader = new StoreChangelogReader(
         restoreStateConsumer,
         Duration.ZERO,
@@ -136,16 +147,16 @@ public class StandbyTaskTest {
     @Before
     public void setup() throws Exception {
         restoreStateConsumer.reset();
-        restoreStateConsumer.updatePartitions(storeChangelogTopicName1, Utils.mkList(
-                new PartitionInfo(storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo(storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo(storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])
+        restoreStateConsumer.updatePartitions(storeChangelogTopicName1, mkList(
+            new PartitionInfo(storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])
         ));
 
-        restoreStateConsumer.updatePartitions(storeChangelogTopicName2, Utils.mkList(
-                new PartitionInfo(storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo(storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])
+        restoreStateConsumer.updatePartitions(storeChangelogTopicName2, mkList(
+            new PartitionInfo(storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])
         ));
         baseDir = TestUtils.tempDirectory();
         stateDirectory = new StateDirectory(createConfig(baseDir), new MockTime());
@@ -158,8 +169,8 @@ public class StandbyTaskTest {
 
     @Test
     public void testStorePartitions() throws IOException {
-        StreamsConfig config = createConfig(baseDir);
-        StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        final StreamsConfig config = createConfig(baseDir);
+        final StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
         task.initializeStateStores();
         assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet()));
     }
@@ -167,228 +178,396 @@ public class StandbyTaskTest {
     @SuppressWarnings("unchecked")
     @Test(expected = ProcessorStateException.class)
     public void testUpdateNonPersistentStore() throws IOException {
-        StreamsConfig config = createConfig(baseDir);
-        StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        final StreamsConfig config = createConfig(baseDir);
+        final StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
 
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
         task.update(partition1,
-                records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))
+            singletonList(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))
         );
 
     }
 
     @Test
     public void testUpdate() throws IOException {
-        StreamsConfig config = createConfig(baseDir);
-        StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        final StreamsConfig config = createConfig(baseDir);
+        final StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
         task.initializeStateStores();
         final Set<TopicPartition> partition = Collections.singleton(partition2);
         restoreStateConsumer.assign(partition);
 
-        for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
-                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
-                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
-                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100))) {
+        for (final ConsumerRecord<Integer, Integer> record : Arrays.asList(
+            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
+            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
+            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100))) {
             restoreStateConsumer.bufferRecord(record);
         }
 
         restoreStateConsumer.seekToBeginning(partition);
         task.update(partition2, restoreStateConsumer.poll(Duration.ofMillis(100)).records(partition2));
 
-        StandbyContextImpl context = (StandbyContextImpl) task.context();
-        MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
-        MockStateStore store2 = (MockStateStore) context.getStateMgr().getStore(storeName2);
+        final StandbyContextImpl context = (StandbyContextImpl) task.context();
+        final MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
+        final MockStateStore store2 = (MockStateStore) context.getStateMgr().getStore(storeName2);
 
         assertEquals(Collections.emptyList(), store1.keys);
-        assertEquals(Utils.mkList(1, 2, 3), store2.keys);
+        assertEquals(mkList(1, 2, 3), store2.keys);
+    }
 
-        task.closeStateManager(true);
+    @Test
+    public void shouldRestoreToWindowedStores() throws IOException {
+        final String storeName = "windowed-store";
+        final String changelogName = applicationId + "-" + storeName + "-changelog";
 
-        File taskDir = stateDirectory.directoryForTask(taskId);
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-        Map<TopicPartition, Long> offsets = checkpoint.read();
+        final TopicPartition topicPartition = new TopicPartition(changelogName, 1);
 
-        assertEquals(1, offsets.size());
-        assertEquals(new Long(30L + 1L), offsets.get(partition2));
+        final List<TopicPartition> partitions = mkList(topicPartition);
+
+        consumer.assign(partitions);
+
+        final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder().setApplicationId(applicationId);
+
+        new InternalStreamsBuilder(internalTopologyBuilder)
+            .stream(Collections.singleton("topic"), new ConsumedInternal<>())
+            .groupByKey()
+            .windowedBy(TimeWindows.of(60_000).until(120_000))
+            .count(Materialized.as(storeName));
+
+        final StandbyTask task = new StandbyTask(
+            taskId,
+            partitions,
+            internalTopologyBuilder.build(0),
+            consumer,
+            new StoreChangelogReader(
+                restoreStateConsumer,
+                Duration.ZERO,
+                stateRestoreListener,
+                new LogContext("standby-task-test ")
+            ),
+            createConfig(baseDir),
+            new MockStreamsMetrics(new Metrics()),
+            stateDirectory
+        );
+
+        task.initializeStateStores();
+
+        consumer.commitSync(mkMap(mkEntry(topicPartition, new OffsetAndMetadata(35L))));
+        task.commit();
+
+        final List<ConsumerRecord<byte[], byte[]>> remaining1 = task.update(
+            topicPartition,
+            Arrays.asList(
+                makeWindowedConsumerRecord(changelogName, 10, 1, 0L, 60_000L),
+                makeWindowedConsumerRecord(changelogName, 20, 2, 60_000L, 120_000),
+                makeWindowedConsumerRecord(changelogName, 30, 3, 120_000L, 180_000),
+                makeWindowedConsumerRecord(changelogName, 40, 4, 180_000L, 240_000)
+            )
+        );
+
+        assertEquals(
+            Arrays.asList(
+                new KeyValue<>(new Windowed<>(1, new TimeWindow(0, 60_000)), 100L),
+                new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), 100L),
+                new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), 100L)
+            ),
+            getWindowedStoreContents(storeName, task)
+        );
+
+        consumer.commitSync(mkMap(mkEntry(topicPartition, new OffsetAndMetadata(45L))));
+        task.commit();
+
+        final List<ConsumerRecord<byte[], byte[]>> remaining2 = task.update(topicPartition, remaining1);
+        assertEquals(emptyList(), remaining2);
+
+        // the first record's window should have expired.
+        assertEquals(
+            Arrays.asList(
+                new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), 100L),
+                new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), 100L),
+                new KeyValue<>(new Windowed<>(4, new TimeWindow(180_000, 240_000)), 100L)
+            ),
+            getWindowedStoreContents(storeName, task)
+        );
+    }
+
+    private ConsumerRecord<byte[], byte[]> makeWindowedConsumerRecord(final String changelogName,
+                                                                      final int offset,
+                                                                      final int key,
+                                                                      final long start,
+                                                                      final long end) {
+        final Windowed<Integer> data = new Windowed<>(key, new TimeWindow(start, end));
+        final Bytes wrap = Bytes.wrap(new IntegerSerializer().serialize(null, data.key()));
+        final byte[] keyBytes = WindowKeySchema.toStoreKeyBinary(new Windowed<>(wrap, data.window()), 1).get();
+        return new ConsumerRecord<>(
+            changelogName,
+            1,
+            offset,
+            start,
+            TimestampType.CREATE_TIME,
+            0L,
+            0,
+            0,
+            keyBytes,
+            new LongSerializer().serialize(null, 100L)
+        );
     }
 
     @Test
-    public void testUpdateKTable() throws IOException {
-        consumer.assign(Utils.mkList(globalTopicPartition));
-        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
-        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(0L));
-        consumer.commitSync(committedOffsets);
+    public void shouldWriteCheckpointFile() throws IOException {
+        final String storeName = "checkpoint-file-store";
+        final String changelogName = applicationId + "-" + storeName + "-changelog";
 
-        restoreStateConsumer.updatePartitions(globalStoreName, Utils.mkList(
-                new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo(globalStoreName, 1, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo(globalStoreName, 2, Node.noNode(), new Node[0], new Node[0])
-        ));
+        final TopicPartition topicPartition = new TopicPartition(changelogName, 1);
+        final List<TopicPartition> partitions = mkList(topicPartition);
+
+        final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder().setApplicationId(applicationId);
+
+        new InternalStreamsBuilder(internalTopologyBuilder)
+            .stream(Collections.singleton("topic"), new ConsumedInternal<>())
+            .groupByKey()
+            .count(Materialized.as(storeName));
 
-        StreamsConfig config = createConfig(baseDir);
-        StandbyTask task = new StandbyTask(taskId, ktablePartitions, ktableTopology, consumer, changelogReader, config, null, stateDirectory);
+        consumer.assign(partitions);
+
+        final StandbyTask task = new StandbyTask(
+            taskId,
+            partitions,
+            internalTopologyBuilder.build(0),
+            consumer,
+            changelogReader,
+            createConfig(baseDir),
+            new MockStreamsMetrics(new Metrics()),
+            stateDirectory
+        );
         task.initializeStateStores();
-        restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
-        for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
-                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
-                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
-                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100),
-                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 4, 100),
-                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 5, 100))) {
-            restoreStateConsumer.bufferRecord(record);
-        }
+        consumer.commitSync(mkMap(mkEntry(topicPartition, new OffsetAndMetadata(20L))));
+        task.commit();
+
+        task.update(
+            topicPartition,
+            singletonList(makeWindowedConsumerRecord(changelogName, 10, 1, 0L, 60_000L))
+        );
+
+        task.closeStateManager(true);
 
-        for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
-            TopicPartition partition = entry.getKey();
-            long offset = entry.getValue();
-            if (offset >= 0) {
-                restoreStateConsumer.seek(partition, offset);
-            } else {
-                restoreStateConsumer.seekToBeginning(singleton(partition));
+        final File taskDir = stateDirectory.directoryForTask(taskId);
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        final Map<TopicPartition, Long> offsets = checkpoint.read();
+
+        assertEquals(1, offsets.size());
+        assertEquals(new Long(11L), offsets.get(topicPartition));
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<KeyValue<Windowed<Integer>, Long>> getWindowedStoreContents(final String storeName, final StandbyTask task) {
+        final StandbyContextImpl context = (StandbyContextImpl) task.context();
+
+        final List<KeyValue<Windowed<Integer>, Long>> result = new ArrayList<>();
+
+        try (final KeyValueIterator<Windowed<byte[]>, Long> iterator =
+                 ((WindowStore) context.getStateMgr().getStore(storeName)).all()) {
+
+            while (iterator.hasNext()) {
+                final KeyValue<Windowed<byte[]>, Long> next = iterator.next();
+                final Integer deserializedKey = new IntegerDeserializer().deserialize(null, next.key.key());
+                result.add(new KeyValue<>(new Windowed<>(deserializedKey, next.key.window()), next.value));
             }
         }
 
+        return result;
+    }
+
+    @Test
+    public void shouldRestoreToKTable() throws IOException {
+        consumer.assign(mkList(globalTopicPartition));
+        consumer.commitSync(mkMap(mkEntry(globalTopicPartition, new OffsetAndMetadata(0L))));
+
+        final StandbyTask task = new StandbyTask(
+            taskId,
+            ktablePartitions,
+            ktableTopology,
+            consumer,
+            changelogReader,
+            createConfig(baseDir),
+            null,
+            stateDirectory
+        );
+        task.initializeStateStores();
+
         // The commit offset is at 0L. Records should not be processed
-        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(Duration.ofMillis(100)).records(globalTopicPartition));
+        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(
+            globalTopicPartition,
+            Arrays.asList(
+                makeConsumerRecord(globalTopicPartition, 10, 1),
+                makeConsumerRecord(globalTopicPartition, 20, 2),
+                makeConsumerRecord(globalTopicPartition, 30, 3),
+                makeConsumerRecord(globalTopicPartition, 40, 4),
+                makeConsumerRecord(globalTopicPartition, 50, 5)
+            )
+        );
         assertEquals(5, remaining.size());
 
-        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L));
-        consumer.commitSync(committedOffsets);
+        consumer.commitSync(mkMap(mkEntry(globalTopicPartition, new OffsetAndMetadata(10L))));
         task.commit(); // update offset limits
 
         // The commit offset has not reached, yet.
         remaining = task.update(globalTopicPartition, remaining);
         assertEquals(5, remaining.size());
 
-        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(11L));
-        consumer.commitSync(committedOffsets);
+        consumer.commitSync(mkMap(mkEntry(globalTopicPartition, new OffsetAndMetadata(11L))));
         task.commit(); // update offset limits
 
         // one record should be processed.
         remaining = task.update(globalTopicPartition, remaining);
         assertEquals(4, remaining.size());
 
-        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(45L));
-        consumer.commitSync(committedOffsets);
+        consumer.commitSync(mkMap(mkEntry(globalTopicPartition, new OffsetAndMetadata(45L))));
         task.commit(); // update offset limits
 
         // The commit offset is now 45. All record except for the last one should be processed.
         remaining = task.update(globalTopicPartition, remaining);
         assertEquals(1, remaining.size());
 
-        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(50L));
-        consumer.commitSync(committedOffsets);
+        consumer.commitSync(mkMap(mkEntry(globalTopicPartition, new OffsetAndMetadata(50L))));
         task.commit(); // update offset limits
 
         // The commit offset is now 50. Still the last record remains.
         remaining = task.update(globalTopicPartition, remaining);
         assertEquals(1, remaining.size());
 
-        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(60L));
-        consumer.commitSync(committedOffsets);
+        consumer.commitSync(mkMap(mkEntry(globalTopicPartition, new OffsetAndMetadata(60L))));
         task.commit(); // update offset limits
 
         // The commit offset is now 60. No record should be left.
         remaining = task.update(globalTopicPartition, remaining);
-        assertNull(remaining);
-
-        task.closeStateManager(true);
+        assertEquals(emptyList(), remaining);
+    }
 
-        File taskDir = stateDirectory.directoryForTask(taskId);
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-        Map<TopicPartition, Long> offsets = checkpoint.read();
+    private ConsumerRecord<byte[], byte[]> makeConsumerRecord(final TopicPartition topicPartition, final long offset, final int key) {
+        final IntegerSerializer integerSerializer = new IntegerSerializer();
+        return new ConsumerRecord<>(
+            topicPartition.topic(),
+            topicPartition.partition(),
+            offset,
+            0L,
+            TimestampType.CREATE_TIME,
+            0L,
+            0,
+            0,
+            integerSerializer.serialize(null, key),
+            integerSerializer.serialize(null, 100)
+        );
+    }
 
-        assertEquals(1, offsets.size());
-        assertEquals(new Long(51L), offsets.get(globalTopicPartition));
+    @Test
+    public void shouldInitializeStateStoreWithoutException() throws IOException {
+        final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().count();
 
+        initializeStandbyStores(builder);
     }
 
     @Test
-    public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws IOException {
-        final String changelogName = "test-application-my-store-changelog";
-        final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
-        consumer.assign(partitions);
-        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
-        committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L));
-        consumer.commitSync(committedOffsets);
-
-        restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
-                new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
+    public void shouldInitializeWindowStoreWithoutException() throws IOException {
         final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
-        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().count();
+        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().windowedBy(TimeWindows.of(100)).count();
+
+        initializeStandbyStores(builder);
+    }
 
+    private void initializeStandbyStores(final InternalStreamsBuilder builder) throws IOException {
         final StreamsConfig config = createConfig(baseDir);
         final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
         final ProcessorTopology topology = internalTopologyBuilder.setApplicationId(applicationId).build(0);
 
-        new StandbyTask(taskId, partitions, topology, consumer, changelogReader, config,
-            new MockStreamsMetrics(new Metrics()), stateDirectory);
+        final StandbyTask standbyTask = new StandbyTask(
+            taskId,
+            emptySet(),
+            topology,
+            consumer,
+            changelogReader,
+            config,
+            new MockStreamsMetrics(new Metrics()),
+            stateDirectory
+        );
+
+        standbyTask.initializeStateStores();
+
+        assertTrue(standbyTask.hasStateStores());
     }
 
     @Test
     public void shouldCheckpointStoreOffsetsOnCommit() throws IOException {
-        consumer.assign(Utils.mkList(globalTopicPartition));
+        consumer.assign(mkList(globalTopicPartition));
         final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
         committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
         consumer.commitSync(committedOffsets);
 
-        restoreStateConsumer.updatePartitions(globalStoreName, Utils.mkList(
-                new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0])));
+        restoreStateConsumer.updatePartitions(
+            globalStoreName,
+            mkList(new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]))
+        );
 
         final TaskId taskId = new TaskId(0, 0);
         final MockTime time = new MockTime();
         final StreamsConfig config = createConfig(baseDir);
-        final StandbyTask task = new StandbyTask(taskId,
-                                                 ktablePartitions,
-                                                 ktableTopology,
-                                                 consumer,
-                                                 changelogReader,
-                                                 config,
-                                                 null,
-                                                 stateDirectory
+        final StandbyTask task = new StandbyTask(
+            taskId,
+            ktablePartitions,
+            ktableTopology,
+            consumer,
+            changelogReader,
+            config,
+            null,
+            stateDirectory
         );
         task.initializeStateStores();
 
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
         final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1);
-        task.update(globalTopicPartition, Collections.singletonList(new ConsumerRecord<>(globalTopicPartition.topic(),
-                                                                           globalTopicPartition.partition(),
-                                                                           50L,
-                                                                           serializedValue,
-                                                                           serializedValue)));
+        task.update(
+            globalTopicPartition,
+            singletonList(
+                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 50L, serializedValue, serializedValue)
+            )
+        );
 
         time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
         task.commit();
 
-        final Map<TopicPartition, Long> checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId),
-                                                                                   ProcessorStateManager.CHECKPOINT_FILE_NAME)).read();
+        final Map<TopicPartition, Long> checkpoint = new OffsetCheckpoint(
+            new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)
+        ).read();
         assertThat(checkpoint, equalTo(Collections.singletonMap(globalTopicPartition, 51L)));
 
     }
 
     @Test
     public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed() throws Exception {
-        consumer.assign(Utils.mkList(globalTopicPartition));
+        consumer.assign(mkList(globalTopicPartition));
         final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
         committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
         consumer.commitSync(committedOffsets);
 
-        restoreStateConsumer.updatePartitions(globalStoreName, Utils.mkList(
-                new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0])));
+        restoreStateConsumer.updatePartitions(
+            globalStoreName,
+            mkList(new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]))
+        );
 
         final StreamsConfig config = createConfig(baseDir);
         final AtomicBoolean closedStateManager = new AtomicBoolean(false);
-        final StandbyTask task = new StandbyTask(taskId,
-                                                 ktablePartitions,
-                                                 ktableTopology,
-                                                 consumer,
-                                                 changelogReader,
-                                                 config,
-                                                 null,
-                                                 stateDirectory
+        final StandbyTask task = new StandbyTask(
+            taskId,
+            ktablePartitions,
+            ktableTopology,
+            consumer,
+            changelogReader,
+            config,
+            null,
+            stateDirectory
         ) {
             @Override
             public void commit() {
@@ -404,13 +583,10 @@ public class StandbyTaskTest {
         try {
             task.close(true, false);
             fail("should have thrown exception");
-        } catch (Exception e) {
+        } catch (final Exception e) {
             // expected
         }
         assertTrue(closedStateManager.get());
     }
 
-    private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
-        return Arrays.asList(recs);
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index 00788fd..78b4ff8 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -21,8 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 
 import java.time.Duration;
@@ -31,8 +29,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 
-public class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
-    private final Serializer<Integer> serializer = new IntegerSerializer();
+public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
 
     private TopicPartition assignedPartition = null;
     private long seekOffset = -1L;
@@ -41,10 +40,12 @@ public class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
 
     private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new ArrayList<>();
 
-    public MockRestoreConsumer() {
+    public MockRestoreConsumer(final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
         super(OffsetResetStrategy.EARLIEST);
 
         reset();
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
     }
 
     // reset this mock restore consumer for a state store registration
@@ -56,12 +57,12 @@ public class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
     }
 
     // buffer a record (we cannot use addRecord because we need to add records before assigning a partition)
-    public void bufferRecord(ConsumerRecord<Integer, Integer> record) {
+    public void bufferRecord(ConsumerRecord<K, V> record) {
         recordBuffer.add(
-            new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), 0L,
-                                 TimestampType.CREATE_TIME, 0L, 0, 0,
-                                 serializer.serialize(record.topic(), record.key()),
-                                 serializer.serialize(record.topic(), record.value())));
+            new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), record.timestamp(),
+                                 record.timestampType(), 0L, 0, 0,
+                                 keySerializer.serialize(record.topic(), record.key()),
+                                 valueSerializer.serialize(record.topic(), record.value())));
         endOffset = record.offset();
 
         super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset));
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
index ae3de1b..496c495 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -46,7 +46,6 @@ class StreamsSmokeTest(KafkaTest):
         self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
         self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
 
-    @ignore # ignore this test until StandbyTask supports streamTime
     @cluster(num_nodes=9)
     def test_streams(self):
         """


Mime
View raw message