kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-6967: TopologyTestDriver does not allow pre-populating state stores that have change logging (#5096)
Date Wed, 06 Jun 2018 22:17:32 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new a0dd05c  KAFKA-6967: TopologyTestDriver does not allow pre-populating state stores that have change logging (#5096)
a0dd05c is described below

commit a0dd05c2c7947d18e71e2ea1bb83683509fe57fe
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Wed Jun 6 15:16:21 2018 -0700

    KAFKA-6967: TopologyTestDriver does not allow pre-populating state stores that have change logging (#5096)
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, James Cheng <jylcheng@yahoo.com>, Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>
---
 build.gradle                                       |   1 +
 .../kstream/internals/KGroupedTableImplTest.java   |   4 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  69 +++++------
 .../streams/processor/MockProcessorContext.java    |  35 +++++-
 .../kafka/streams/test/ConsumerRecordFactory.java  |  31 ++++-
 .../apache/kafka/streams/test/OutputVerifier.java  |  14 +++
 .../kafka/streams/MockProcessorContextTest.java    |  11 +-
 .../kafka/streams/TopologyTestDriverTest.java      | 133 ++++++++++++---------
 8 files changed, 190 insertions(+), 108 deletions(-)

diff --git a/build.gradle b/build.gradle
index 4f3fd77..14479f1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1017,6 +1017,7 @@ project(':streams:test-utils') {
 
     testCompile project(':clients').sourceSets.test.output
     testCompile libs.junit
+    testCompile libs.easymock
 
     testRuntime libs.slf4jlog4j
   }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 4b8298f..79e0b42 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -199,7 +199,7 @@ public class KGroupedTableImplTest {
         final Map<String, Integer> results = getReducedResults(reduced);
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             assertReduced(results, topic, driver);
-            final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.getStateStore("reduce");
+            final KeyValueStore<String, Integer> reduce = driver.getKeyValueStore("reduce");
             assertThat(reduce.get("A"), equalTo(5));
             assertThat(reduce.get("B"), equalTo(6));
         }
@@ -240,7 +240,7 @@ public class KGroupedTableImplTest {
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             processData(topic, driver);
-            final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.getStateStore("aggregate");
+            final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
             assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
             assertThat(aggregate.get("2"), equalTo("0+2+2"));
         }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index e46ec6a..773cbb4 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -170,7 +171,7 @@ import java.util.regex.Pattern;
 @InterfaceStability.Evolving
 public class TopologyTestDriver implements Closeable {
 
-    private final Time mockTime;
+    private final Time mockWallClockTime;
     private final InternalTopologyBuilder internalTopologyBuilder;
 
     private final static int PARTITION_ID = 0;
@@ -179,6 +180,8 @@ public class TopologyTestDriver implements Closeable {
     private final GlobalStateUpdateTask globalStateTask;
     private final GlobalStateManager globalStateManager;
 
+    private final InternalProcessorContext context;
+
     private final StateDirectory stateDirectory;
     private final Metrics metrics;
     private final ProcessorTopology processorTopology;
@@ -216,7 +219,6 @@ public class TopologyTestDriver implements Closeable {
     public TopologyTestDriver(final Topology topology,
                               final Properties config,
                               final long initialWallClockTimeMs) {
-
         this(topology.internalTopologyBuilder, config, initialWallClockTimeMs);
     }
 
@@ -225,25 +227,13 @@ public class TopologyTestDriver implements Closeable {
      *
      * @param builder builder for the topology to be tested
      * @param config the configuration for the topology
-     */
-    TopologyTestDriver(final InternalTopologyBuilder builder,
-                       final Properties config) {
-        this(builder, config,  System.currentTimeMillis());
-
-    }
-
-    /**
-     * Create a new test diver instance.
-     *
-     * @param builder builder for the topology to be tested
-     * @param config the configuration for the topology
      * @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
      */
     private TopologyTestDriver(final InternalTopologyBuilder builder,
-                              final Properties config,
-                              final long initialWallClockTimeMs) {
+                               final Properties config,
+                               final long initialWallClockTimeMs) {
         final StreamsConfig streamsConfig = new StreamsConfig(config);
-        mockTime = new MockTime(initialWallClockTimeMs);
+        mockWallClockTime = new MockTime(initialWallClockTimeMs);
 
         internalTopologyBuilder = builder;
         internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
@@ -260,7 +250,7 @@ public class TopologyTestDriver implements Closeable {
         };
 
         final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        stateDirectory = new StateDirectory(streamsConfig, mockTime);
+        stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime);
         metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
@@ -323,6 +313,7 @@ public class TopologyTestDriver implements Closeable {
                 new LogContext()
             );
             globalStateTask.initialize();
+            globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
         } else {
             globalStateManager = null;
             globalStateTask = null;
@@ -342,12 +333,15 @@ public class TopologyTestDriver implements Closeable {
                 streamsMetrics,
                 stateDirectory,
                 cache,
-                mockTime,
+                mockWallClockTime,
                 producer);
             task.initializeStateStores();
             task.initializeTopology();
+            context = (InternalProcessorContext) task.context();
+            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
         } else {
             task = null;
+            context = null;
         }
     }
 
@@ -356,6 +350,7 @@ public class TopologyTestDriver implements Closeable {
      *
      * @return Map of all metrics.
      */
+    @SuppressWarnings("WeakerAccess")
     public Map<MetricName, ? extends Metric> metrics() {
         return Collections.unmodifiableMap(metrics.metrics());
     }
@@ -390,13 +385,10 @@ public class TopologyTestDriver implements Closeable {
                 consumerRecord.headers())));
 
             // Process the record ...
-            ((InternalProcessorContext) task.context()).setRecordContext(
-                    new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName, consumerRecord.headers()));
             task.process();
             task.maybePunctuateStreamTime();
             task.commit();
             captureOutputRecords();
-
         } else {
             final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName);
             if (globalTopicPartition == null) {
@@ -446,12 +438,7 @@ public class TopologyTestDriver implements Closeable {
         final List<ProducerRecord<byte[], byte[]>> output = producer.history();
         producer.clear();
         for (final ProducerRecord<byte[], byte[]> record : output) {
-            Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
-            if (outputRecords == null) {
-                outputRecords = new LinkedList<>();
-                outputRecordsByTopic.put(record.topic(), outputRecords);
-            }
-            outputRecords.add(record);
+            outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new LinkedList<>()).add(record);
 
             // Forward back into the topology if the produced record is to an internal or a source topic ...
             final String outputTopicName = record.topic();
@@ -497,7 +484,7 @@ public class TopologyTestDriver implements Closeable {
      */
     @SuppressWarnings("WeakerAccess")
     public void advanceWallClockTime(final long advanceMs) {
-        mockTime.sleep(advanceMs);
+        mockWallClockTime.sleep(advanceMs);
         if (task != null) {
             task.maybePunctuateSystemTime();
             task.commit();
@@ -549,6 +536,8 @@ public class TopologyTestDriver implements Closeable {
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
      * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+     * <p>
+     * Note, that {@code StateStore} might be {@code null} if a store is added but not connected to any processor.
      *
      * @return all stores my name
      * @see #getStateStore(String)
@@ -579,13 +568,24 @@ public class TopologyTestDriver implements Closeable {
      * @see #getWindowStore(String)
      * @see #getSessionStore(String)
      */
+    @SuppressWarnings("WeakerAccess")
     public StateStore getStateStore(final String name) {
-        StateStore stateStore = task == null ? null :
-            ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
-        if (stateStore == null && globalStateManager != null) {
-            stateStore = globalStateManager.getGlobalStore(name);
+        if (task != null) {
+            final StateStore stateStore = ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
+            if (stateStore != null) {
+                return stateStore;
+            }
         }
-        return stateStore;
+
+        if (globalStateManager != null) {
+            final StateStore stateStore = globalStateManager.getGlobalStore(name);
+            if (stateStore != null) {
+                return stateStore;
+            }
+
+        }
+
+        return null;
     }
 
     /**
@@ -651,6 +651,7 @@ public class TopologyTestDriver implements Closeable {
     /**
      * Close the driver, its topology, and all processors.
      */
+    @SuppressWarnings("WeakerAccess")
     public void close() {
         if (task != null) {
             task.close(true, false);
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index b14a791..cba0257 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -86,23 +86,27 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
             this.punctuator = punctuator;
         }
 
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public long getIntervalMs() {
             return intervalMs;
         }
 
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public PunctuationType getType() {
             return type;
         }
 
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public Punctuator getPunctuator() {
             return punctuator;
         }
 
-        @SuppressWarnings("WeakerAccess")
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public void cancel() {
             cancelled = true;
         }
 
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public boolean cancelled() {
             return cancelled;
         }
@@ -127,6 +131,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
          *
          * @return The child name, or {@code null} if it was broadcast.
          */
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public String childName() {
             return childName;
         }
@@ -136,6 +141,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
          *
          * @return A timestamp, or {@code -1} if none was forwarded.
          */
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public long timestamp() {
             return timestamp;
         }
@@ -145,6 +151,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
          *
          * @return A key/value pair. Not null.
          */
+        @SuppressWarnings({"WeakerAccess", "unused"})
         public KeyValue keyValue() {
             return keyValue;
         }
@@ -158,6 +165,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      * and most unit tests should be able to get by with the
      * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public MockProcessorContext() {
         //noinspection DoubleBraceInitialization
         this(
@@ -179,6 +187,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      *
      * @param config a Properties object, used to configure the context and the processor.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public MockProcessorContext(final Properties config) {
         this(config, new TaskId(0, 0), null);
     }
@@ -190,6 +199,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
      * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
         final StreamsConfig streamsConfig = new StreamsConfig(config);
         this.taskId = taskId;
@@ -252,6 +262,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      * @param offset    A record offset
      * @param timestamp A record timestamp
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setRecordMetadata(final String topic, final int partition, final long offset, final Headers headers, final long timestamp) {
         this.topic = topic;
         this.partition = partition;
@@ -260,13 +271,13 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         this.timestamp = timestamp;
     }
 
-
     /**
      * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
      * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
      *
      * @param topic A topic name
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setTopic(final String topic) {
         this.topic = topic;
     }
@@ -277,21 +288,29 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      *
      * @param partition A partition number
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setPartition(final int partition) {
         this.partition = partition;
     }
 
-
     /**
      * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
      * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
      *
      * @param offset A record offset
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setOffset(final long offset) {
         this.offset = offset;
     }
 
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param headers Record headers
+     */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setHeaders(final Headers headers) {
         this.headers = headers;
     }
@@ -302,6 +321,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      *
      * @param timestamp A record timestamp
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void setTimestamp(final long timestamp) {
         this.timestamp = timestamp;
     }
@@ -345,7 +365,6 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     // mocks ================================================
 
-
     @Override
     public void register(final StateStore store,
                          final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) {
@@ -376,6 +395,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      *
      * @return A list of captured punctuators.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<CapturedPunctuator> scheduledPunctuators() {
         final LinkedList<CapturedPunctuator> capturedPunctuators = new LinkedList<>();
         capturedPunctuators.addAll(punctuators);
@@ -394,6 +414,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         capturedForwards.add(new CapturedForward(to, new KeyValue(key, value)));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new UnsupportedOperationException(
@@ -402,6 +423,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         );
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException(
@@ -417,6 +439,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      *
      * @return A list of key/value pairs that were previously passed to the context.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<CapturedForward> forwarded() {
         final LinkedList<CapturedForward> result = new LinkedList<>();
         result.addAll(capturedForwards);
@@ -431,6 +454,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      * @param childName The child name to retrieve forwards for
      * @return A list of key/value pairs that were previously passed to the context.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<CapturedForward> forwarded(final String childName) {
         final LinkedList<CapturedForward> result = new LinkedList<>();
         for (final CapturedForward capture : capturedForwards) {
@@ -444,6 +468,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     /**
      * Clear the captured forwarded data.
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void resetForwards() {
         capturedForwards.clear();
     }
@@ -458,6 +483,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      *
      * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
      */
+    @SuppressWarnings("WeakerAccess")
     public boolean committed() {
         return committed;
     }
@@ -465,6 +491,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     /**
      * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void resetCommit() {
         committed = false;
     }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index 507249d..108dafd 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -44,7 +44,7 @@ public class ConsumerRecordFactory<K, V> {
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
     private long timeMs;
-    private long advanceMs;
+    private final long advanceMs;
 
     /**
      * Create a new factory for the given topic.
@@ -54,6 +54,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param keySerializer the key serializer
      * @param valueSerializer the value serializer
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer) {
         this(null, keySerializer, valueSerializer, System.currentTimeMillis());
@@ -68,6 +69,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param keySerializer the key serializer
      * @param valueSerializer the value serializer
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final String defaultTopicName,
                                  final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer) {
@@ -82,6 +84,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param valueSerializer the value serializer
      * @param startTimestampMs the initial timestamp for generated records
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer,
                                  final long startTimestampMs) {
@@ -97,6 +100,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param valueSerializer the value serializer
      * @param startTimestampMs the initial timestamp for generated records
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final String defaultTopicName,
                                  final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer,
@@ -112,6 +116,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param startTimestampMs the initial timestamp for generated records
      * @param autoAdvanceMs the time increment pre generated record
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer,
                                  final long startTimestampMs,
@@ -128,6 +133,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param startTimestampMs the initial timestamp for generated records
      * @param autoAdvanceMs the time increment pre generated record
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecordFactory(final String defaultTopicName,
                                  final Serializer<K> keySerializer,
                                  final Serializer<V> valueSerializer,
@@ -147,6 +153,7 @@ public class ConsumerRecordFactory<K, V> {
      *
      * @param advanceMs the amount of time to advance
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public void advanceTimeMs(final long advanceMs) {
         if (advanceMs < 0) {
             throw new IllegalArgumentException("advanceMs must be positive");
@@ -165,6 +172,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final K key,
                                                  final V value,
@@ -198,6 +206,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final K key,
                                                  final V value,
@@ -214,6 +223,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value,
                                                  final long timestampMs) {
@@ -230,6 +240,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value,
                                                  final Headers headers,
@@ -250,6 +261,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param value the record value
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final K key,
                                                  final V value) {
@@ -268,6 +280,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param headers the record headers
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final K key,
                                                  final V value,
@@ -285,6 +298,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param value the record value
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value) {
         return create(key, value, new RecordHeaders());
@@ -299,6 +313,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param headers the record headers
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value,
                                                  final Headers headers) {
@@ -318,6 +333,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value,
                                                  final long timestampMs) {
@@ -334,6 +350,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value,
                                                  final Headers headers,
@@ -349,6 +366,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final V value,
                                                  final long timestampMs) {
         return create(value, new RecordHeaders(), timestampMs);
@@ -363,6 +381,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final V value,
                                                  final Headers headers,
                                                  final long timestampMs) {
@@ -382,6 +401,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param headers the record headers
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value,
                                                  final Headers headers) {
@@ -396,6 +416,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param value the record value
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value) {
         return create(topicName, null, value, new RecordHeaders());
@@ -408,6 +429,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param value the record value
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final V value) {
         return create(value, new RecordHeaders());
     }
@@ -420,6 +442,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param headers the record headers
      * @return the generated {@link ConsumerRecord}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public ConsumerRecord<byte[], byte[]> create(final V value,
                                                  final Headers headers) {
         if (topicName == null) {
@@ -437,6 +460,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param keyValues the record keys and values
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
                                                        final List<KeyValue<K, V>> keyValues) {
         final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(keyValues.size());
@@ -455,6 +479,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param keyValues the record keys and values
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues) {
         if (topicName == null) {
             throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
@@ -474,6 +499,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param advanceMs the time difference between two consecutive generated records
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
                                                        final List<KeyValue<K, V>> keyValues,
                                                        final long startTimestamp,
@@ -502,6 +528,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param advanceMs the time difference between two consecutive generated records
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues,
                                                        final long startTimestamp,
                                                        final long advanceMs) {
@@ -523,6 +550,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param startTimestamp the timestamp for the first generated record
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
                                                        final List<KeyValue<K, V>> keyValues,
                                                        final long startTimestamp) {
@@ -538,6 +566,7 @@ public class ConsumerRecordFactory<K, V> {
      * @param startTimestamp the timestamp for the first generated record
      * @return the generated {@link ConsumerRecord consumer records}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues,
                                                        final long startTimestamp) {
         if (topicName == null) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
index aedb910..f78e926 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
@@ -39,6 +39,7 @@ public class OutputVerifier {
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value is not equal to {@code expectedValue}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValue(final ProducerRecord<K, V> record,
                                            final V expectedValue) throws AssertionError {
         Objects.requireNonNull(record);
@@ -65,6 +66,7 @@ public class OutputVerifier {
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value is not equal to {@code expectedRecord}'s value
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValue(final ProducerRecord<K, V> record,
                                            final ProducerRecord<K, V> expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -82,6 +84,7 @@ public class OutputVerifier {
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s key or value is not equal to {@code expectedKey} or {@code expectedValue}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValue(final ProducerRecord<K, V> record,
                                               final K expectedKey,
                                               final V expectedValue) throws AssertionError {
@@ -119,6 +122,7 @@ public class OutputVerifier {
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s key or value is not equal to {@code expectedRecord}'s key or value
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValue(final ProducerRecord<K, V> record,
                                               final ProducerRecord<K, V> expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -136,6 +140,7 @@ public class OutputVerifier {
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value or timestamp is not equal to {@code expectedValue} or {@code expectedTimestamp}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> record,
                                                     final V expectedValue,
                                                     final long expectedTimestamp) throws AssertionError {
@@ -169,6 +174,7 @@ public class OutputVerifier {
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value or timestamp is not equal to {@code expectedRecord}'s value or timestamp
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> record,
                                                     final ProducerRecord<K, V> expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -189,6 +195,7 @@ public class OutputVerifier {
      * @throws AssertionError if {@code ProducerRecord}'s key, value, timestamp is not equal to {@code expectedKey},
      * {@code expectedValue}, or {@code expectedTimestamps}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, V> record,
                                                        final K expectedKey,
                                                        final V expectedValue,
@@ -233,6 +240,7 @@ public class OutputVerifier {
      * @throws AssertionError if {@code ProducerRecord}'s key, value, or timestamp is not equal to
      * {@code expectedRecord}'s key, value, or timestamp
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, V> record,
                                                        final ProducerRecord<K, V> expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -250,6 +258,7 @@ public class OutputVerifier {
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedValue} or {@code expectedHeaders}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record,
                                                   final V expectedValue,
                                                   final Headers expectedHeaders) throws AssertionError {
@@ -287,6 +296,7 @@ public class OutputVerifier {
      * @param <V> the value type
      * @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedRecord}'s value or headers
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record,
                                                   final ProducerRecord<K, V> expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -307,6 +317,7 @@ public class OutputVerifier {
      * @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey},
      *                        {@code expectedValue}, or {@code expectedHeaders}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record,
                                                      final K expectedKey,
                                                      final V expectedValue,
@@ -355,6 +366,7 @@ public class OutputVerifier {
      * @throws AssertionError if {@code ProducerRecord}'s key, value, or headers is not equal to
      *                        {@code expectedRecord}'s key, value, or headers
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record,
                                                      final ProducerRecord<K, V> expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
@@ -376,6 +388,7 @@ public class OutputVerifier {
      * @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey},
      *                        {@code expectedValue}, or {@code expectedHeaders}
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record,
                                                               final K expectedKey,
                                                               final V expectedValue,
@@ -432,6 +445,7 @@ public class OutputVerifier {
      * @throws AssertionError if {@code ProducerRecord}'s key, value, headers, or timestamp is not equal to
      *                        {@code expectedRecord}'s key, value, headers, or timestamp
      */
+    @SuppressWarnings({"WeakerAccess", "unused"})
     public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record,
                                                               final ProducerRecord<K, V> expectedRecord) throws AssertionError {
         Objects.requireNonNull(expectedRecord);
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 64d5b12..878aa35 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -156,9 +156,9 @@ public class MockProcessorContextTest {
     @Test
     public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
         final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @SuppressWarnings("deprecation")
             @Override
             public void process(final String key, final Long value) {
-                //noinspection deprecation
                 context().forward(key, value, 0);
             }
         };
@@ -178,9 +178,9 @@ public class MockProcessorContextTest {
     @Test
     public void shouldThrowIfForwardedWithDeprecatedChildName() {
         final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @SuppressWarnings("deprecation")
             @Override
             public void process(final String key, final Long value) {
-                //noinspection deprecation
                 context().forward(key, value, "child1");
             }
         };
@@ -347,12 +347,7 @@ public class MockProcessorContextTest {
                 context.schedule(
                     1000L,
                     PunctuationType.WALL_CLOCK_TIME,
-                    new Punctuator() {
-                        @Override
-                        public void punctuate(final long timestamp) {
-                            context.commit();
-                        }
-                    }
+                    timestamp -> context.commit()
                 );
             }
 
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 2d446d1..7552637 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -31,14 +31,15 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -64,6 +65,7 @@ import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -100,26 +102,27 @@ public class TopologyTestDriverTest {
     };
     private KeyValueStore<String, Long> store;
 
-    private StringDeserializer stringDeserializer = new StringDeserializer();
-    private LongDeserializer longDeserializer = new LongDeserializer();
-    private ConsumerRecordFactory<String, Long> recordFactory = new ConsumerRecordFactory<>(
+    private final StringDeserializer stringDeserializer = new StringDeserializer();
+    private final LongDeserializer longDeserializer = new LongDeserializer();
+    private final ConsumerRecordFactory<String, Long> recordFactory = new ConsumerRecordFactory<>(
         new StringSerializer(),
         new LongSerializer());
 
 
     private final static class Record {
-        private Object key;
-        private Object value;
-        private long timestamp;
-        private long offset;
-        private String topic;
-        private Headers headers;
-
-        Record(final ConsumerRecord consumerRecord) {
+        private final Object key;
+        private final Object value;
+        private final long timestamp;
+        private final long offset;
+        private final String topic;
+        private final Headers headers;
+
+        Record(final ConsumerRecord consumerRecord,
+               final long newOffset) {
             key = consumerRecord.key();
             value = consumerRecord.value();
             timestamp = consumerRecord.timestamp();
-            offset = consumerRecord.offset();
+            offset = newOffset;
             topic = consumerRecord.topic();
             headers = consumerRecord.headers();
         }
@@ -184,7 +187,7 @@ public class TopologyTestDriverTest {
         private final List<Long> punctuatedAt = new LinkedList<>();
 
         @Override
-        public void punctuate(long timestamp) {
+        public void punctuate(final long timestamp) {
             punctuatedAt.add(timestamp);
         }
     }
@@ -202,7 +205,7 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             initialized = true;
             this.context = context;
             for (final Punctuation punctuation : punctuations) {
@@ -211,7 +214,7 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public void process(Object key, Object value) {
+        public void process(final Object key, final Object value) {
             processedRecords.add(new Record(key, value, context.headers(), context.timestamp(), context.offset(), context.topic()));
             context.forward(key, value);
         }
@@ -228,7 +231,7 @@ public class TopologyTestDriverTest {
         private final Collection<Punctuation> punctuations;
 
         private MockProcessorSupplier() {
-            this(Collections.<Punctuation>emptySet());
+            this(Collections.emptySet());
         }
 
         private MockProcessorSupplier(final Collection<Punctuation> punctuations) {
@@ -391,8 +394,7 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords.size());
 
         final Record record = processedRecords.get(0);
-        final Record expectedResult = new Record(consumerRecord1);
-        expectedResult.offset = 0L;
+        final Record expectedResult = new Record(consumerRecord1, 0L);
 
         assertThat(record, equalTo(expectedResult));
     }
@@ -410,8 +412,7 @@ public class TopologyTestDriverTest {
         assertEquals(0, processedRecords2.size());
 
         Record record = processedRecords1.get(0);
-        Record expectedResult = new Record(consumerRecord1);
-        expectedResult.offset = 0L;
+        Record expectedResult = new Record(consumerRecord1, 0L);
         assertThat(record, equalTo(expectedResult));
 
         testDriver.pipeInput(consumerRecord2);
@@ -420,8 +421,7 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords2.size());
 
         record = processedRecords2.get(0);
-        expectedResult = new Record(consumerRecord2);
-        expectedResult.offset = 0L;
+        expectedResult = new Record(consumerRecord2, 0L);
         assertThat(record, equalTo(expectedResult));
     }
 
@@ -439,7 +439,7 @@ public class TopologyTestDriverTest {
         topology.addSink(
             "sink",
             SINK_TOPIC_1,
-            new Serializer() {
+            new Serializer<Object>() {
                 @Override
                 public byte[] serialize(final String topic, final Object data) {
                     if (data instanceof Long) {
@@ -452,7 +452,7 @@ public class TopologyTestDriverTest {
                 @Override
                 public void configure(final Map configs, final boolean isKey) {}
             },
-            new Serializer() {
+            new Serializer<Object>() {
                 @Override
                 public byte[] serialize(final String topic, final Object data) {
                     if (data instanceof String) {
@@ -560,13 +560,11 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords2.size());
 
         Record record = processedRecords1.get(0);
-        Record expectedResult = new Record(consumerRecord1);
-        expectedResult.offset = 0L;
+        Record expectedResult = new Record(consumerRecord1, 0L);
         assertThat(record, equalTo(expectedResult));
 
         record = processedRecords2.get(0);
-        expectedResult = new Record(consumerRecord2);
-        expectedResult.offset = 0L;
+        expectedResult = new Record(consumerRecord2, 0L);
         assertThat(record, equalTo(expectedResult));
     }
 
@@ -601,8 +599,7 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords.size());
 
         final Record record = processedRecords.get(0);
-        final Record expectedResult = new Record(consumerRecord1);
-        expectedResult.offset = 0L;
+        final Record expectedResult = new Record(consumerRecord1, 0L);
         assertThat(record, equalTo(expectedResult));
     }
 
@@ -687,13 +684,14 @@ public class TopologyTestDriverTest {
     @Test
     public void shouldReturnAllStores() {
         final Topology topology = setupSourceSinkTopology();
+        topology.addProcessor("processor", () -> null);
         topology.addStateStore(
             new KeyValueStoreBuilder<>(
                 Stores.inMemoryKeyValueStore("store"),
                 Serdes.ByteArray(),
                 Serdes.ByteArray(),
-                new SystemTime())
-                .withLoggingDisabled());
+                new SystemTime()),
+            "processor");
         topology.addGlobalStore(
             new KeyValueStoreBuilder<>(
                 Stores.inMemoryKeyValueStore("globalStore"),
@@ -705,12 +703,41 @@ public class TopologyTestDriverTest {
             Serdes.ByteArray().deserializer(),
             "globalTopicName",
             "globalProcessorName",
-            new ProcessorSupplier() {
-                @Override
-                public Processor get() {
-                    return null;
-                }
-            });
+            () -> null);
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        final Set<String> expectedStoreNames = new HashSet<>();
+        expectedStoreNames.add("store");
+        expectedStoreNames.add("globalStore");
+        final Map<String, StateStore> allStores = testDriver.getAllStateStores();
+        assertThat(allStores.keySet(), equalTo(expectedStoreNames));
+        for (final StateStore store : allStores.values()) {
+            assertNotNull(store);
+        }
+    }
+
+    @Test
+    public void shouldReturnAllStoresNames() {
+        final Topology topology = setupSourceSinkTopology();
+        topology.addStateStore(
+            new KeyValueStoreBuilder<>(
+                Stores.inMemoryKeyValueStore("store"),
+                Serdes.ByteArray(),
+                Serdes.ByteArray(),
+                new SystemTime()));
+        topology.addGlobalStore(
+            new KeyValueStoreBuilder<>(
+                Stores.inMemoryKeyValueStore("globalStore"),
+                Serdes.ByteArray(),
+                Serdes.ByteArray(),
+                new SystemTime()).withLoggingDisabled(),
+            "sourceProcessorName",
+            Serdes.ByteArray().deserializer(),
+            Serdes.ByteArray().deserializer(),
+            "globalTopicName",
+            "globalProcessorName",
+            () -> null);
 
         testDriver = new TopologyTestDriver(topology, config);
 
@@ -721,13 +748,13 @@ public class TopologyTestDriverTest {
     }
 
     private void setup() {
-        Topology topology = new Topology();
+        final Topology topology = new Topology();
         topology.addSource("sourceProcessor", "input-topic");
         topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
         topology.addStateStore(Stores.keyValueStoreBuilder(
             Stores.inMemoryKeyValueStore("aggStore"),
             Serdes.String(),
-            Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
+            Serdes.Long()),
             "aggregator");
         topology.addSink("sinkProcessor", "result-topic", "aggregator");
 
@@ -812,18 +839,8 @@ public class TopologyTestDriverTest {
         @Override
         public void init(final ProcessorContext context) {
             this.context = context;
-            context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
-                @Override
-                public void punctuate(final long timestamp) {
-                    flushStore();
-                }
-            });
-            context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
-                @Override
-                public void punctuate(final long timestamp) {
-                    flushStore();
-                }
-            });
+            context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, timestamp -> flushStore());
+            context.schedule(10000, PunctuationType.STREAM_TIME, timestamp -> flushStore());
             store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
         }
 
@@ -908,7 +925,7 @@ public class TopologyTestDriverTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.globalTable("topic",
             Consumed.with(Serdes.String(), Serdes.String()),
-            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalStore"));
+            Materialized.as("globalStore"));
         try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), config)) {
             final KeyValueStore<String, String> globalStore = testDriver.getKeyValueStore("globalStore");
             Assert.assertNotNull(globalStore);
@@ -956,8 +973,7 @@ public class TopologyTestDriverTest {
         assertEquals(0, processedRecords2.size());
 
         final Record record1 = processedRecords1.get(0);
-        final Record expectedResult1 = new Record(consumerRecord1);
-        expectedResult1.offset = 0L;
+        final Record expectedResult1 = new Record(consumerRecord1, 0L);
         assertThat(record1, equalTo(expectedResult1));
 
         testDriver.pipeInput(consumerRecord2);
@@ -966,8 +982,7 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords2.size());
 
         final Record record2 = processedRecords2.get(0);
-        final Record expectedResult2 = new Record(consumerRecord2);
-        expectedResult2.offset = 0L;
+        final Record expectedResult2 = new Record(consumerRecord2, 0L);
         assertThat(record2, equalTo(expectedResult2));
     }
 
@@ -1004,7 +1019,7 @@ public class TopologyTestDriverTest {
         try {
             testDriver.pipeInput(consumerRecord1);
         } catch (final TopologyException exception) {
-            String str =
+            final String str =
                     String.format(
                             "Invalid topology: Topology add source of type String for topic: %s cannot contain regex pattern for " +
                                     "input record topic: %s and hence cannot process the message.",

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message