kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: code cleanup (#6056)
Date Tue, 08 Jan 2019 21:33:05 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3991d81  MINOR: code cleanup (#6056)
3991d81 is described below

commit 3991d81f6c645bdf36c58e3d56b829ff92dbff3a
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Tue Jan 8 22:32:53 2019 +0100

    MINOR: code cleanup (#6056)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../apache/kafka/streams/StreamsBuilderTest.java   | 245 +++++++++++------
 .../org/apache/kafka/streams/TopologyTest.java     |  36 +--
 .../streams/integration/EosIntegrationTest.java    |  96 +++----
 .../KStreamAggregationIntegrationTest.java         |   4 +-
 .../kstream/internals/KGroupedStreamImplTest.java  | 272 ++++++++-----------
 .../kstream/internals/KGroupedTableImplTest.java   | 234 +++++++++-------
 .../kstream/internals/KStreamKStreamJoinTest.java  |  31 ++-
 .../kstream/internals/KStreamPrintTest.java        |  48 ++--
 ...KStreamSessionWindowAggregateProcessorTest.java |  49 ++--
 .../streams/kstream/internals/KTableImplTest.java  | 301 +++++++++++----------
 .../kstream/internals/KTableMapValuesTest.java     |  35 ++-
 .../internals/KTableTransformValuesTest.java       |  25 +-
 .../internals/TimeWindowedKStreamImplTest.java     | 118 ++++----
 .../apache/kafka/streams/perf/SimpleBenchmark.java |   4 +-
 .../processor/internals/AbstractTaskTest.java      |   4 +-
 .../internals/ProcessorStateManagerTest.java       |   4 +-
 .../CompositeReadOnlySessionStoreTest.java         |   9 +-
 .../state/internals/RocksDBSessionStoreTest.java   |   4 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   4 +-
 19 files changed, 818 insertions(+), 705 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 894b561..9e88a87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -45,9 +45,9 @@ import java.util.Properties;
 
 import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class StreamsBuilderTest {
 
@@ -64,84 +64,143 @@ public class StreamsBuilderTest {
 
     @Test
     public void shouldAllowJoinUnmaterializedFilteredKTable() {
-        final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.allGoodPredicate());
-        builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
+        final KTable<Bytes, String> filteredKTable = builder
+            .<Bytes, String>table("table-topic")
+            .filter(MockPredicate.allGoodPredicate());
+        builder
+            .<Bytes, String>stream("stream-topic")
+            .join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
-
-        assertThat(topology.stateStores().size(), equalTo(1));
-        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
-        assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty(), is(true));
+        final ProcessorTopology topology =
+            builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+
+        assertThat(
+            topology.stateStores().size(),
+            equalTo(1));
+        assertThat(
+            topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
+            equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+        assertTrue(
+            topology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty());
     }
 
     @Test
     public void shouldAllowJoinMaterializedFilteredKTable() {
-        final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic")
-                .filter(MockPredicate.allGoodPredicate(), Materialized.as("store"));
-        builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
+        final KTable<Bytes, String> filteredKTable = builder
+            .<Bytes, String>table("table-topic")
+            .filter(MockPredicate.allGoodPredicate(), Materialized.as("store"));
+        builder
+            .<Bytes, String>stream("stream-topic")
+            .join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
-
-        assertThat(topology.stateStores().size(), equalTo(1));
-        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
-        assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"), equalTo(Collections.singleton("store")));
+        final ProcessorTopology topology =
+            builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+
+        assertThat(
+            topology.stateStores().size(),
+            equalTo(1));
+        assertThat(
+            topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
+            equalTo(Collections.singleton("store")));
+        assertThat(
+            topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"),
+            equalTo(Collections.singleton("store")));
     }
 
     @Test
     public void shouldAllowJoinUnmaterializedMapValuedKTable() {
-        final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic").mapValues(MockMapper.noOpValueMapper());
-        builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
+        final KTable<Bytes, String> mappedKTable = builder
+            .<Bytes, String>table("table-topic")
+            .mapValues(MockMapper.noOpValueMapper());
+        builder
+            .<Bytes, String>stream("stream-topic")
+            .join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
-
-        assertThat(topology.stateStores().size(), equalTo(1));
-        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
-        assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty(), is(true));
+        final ProcessorTopology topology =
+            builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+
+        assertThat(
+            topology.stateStores().size(),
+            equalTo(1));
+        assertThat(
+            topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
+            equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+        assertTrue(
+            topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty());
     }
 
     @Test
     public void shouldAllowJoinMaterializedMapValuedKTable() {
-        final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic")
-                .mapValues(MockMapper.noOpValueMapper(), Materialized.as("store"));
-        builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
+        final KTable<Bytes, String> mappedKTable = builder
+            .<Bytes, String>table("table-topic")
+            .mapValues(MockMapper.noOpValueMapper(), Materialized.as("store"));
+        builder
+            .<Bytes, String>stream("stream-topic")
+            .join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
-
-        assertThat(topology.stateStores().size(), equalTo(1));
-        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
-        assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"), equalTo(Collections.singleton("store")));
+        final ProcessorTopology topology =
+            builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+
+        assertThat(
+            topology.stateStores().size(),
+            equalTo(1));
+        assertThat(
+            topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"),
+            equalTo(Collections.singleton("store")));
+        assertThat(
+            topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"),
+            equalTo(Collections.singleton("store")));
     }
 
     @Test
     public void shouldAllowJoinUnmaterializedJoinedKTable() {
         final KTable<Bytes, String> table1 = builder.table("table-topic1");
         final KTable<Bytes, String> table2 = builder.table("table-topic2");
-        builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
+        builder
+            .<Bytes, String>stream("stream-topic")
+            .join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
-
-        assertThat(topology.stateStores().size(), equalTo(2));
-        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Utils.mkSet(topology.stateStores().get(0).name(), topology.stateStores().get(1).name())));
-        assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty(), is(true));
+        final ProcessorTopology topology =
+            builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+
+        assertThat(
+            topology.stateStores().size(),
+            equalTo(2));
+        assertThat(
+            topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"),
+            equalTo(Utils.mkSet(topology.stateStores().get(0).name(), topology.stateStores().get(1).name())));
+        assertTrue(
+            topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty());
     }
 
     @Test
     public void shouldAllowJoinMaterializedJoinedKTable() {
         final KTable<Bytes, String> table1 = builder.table("table-topic1");
         final KTable<Bytes, String> table2 = builder.table("table-topic2");
-        builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER);
+        builder
+            .<Bytes, String>stream("stream-topic")
+            .join(
+                table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")),
+                MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
-
-        assertThat(topology.stateStores().size(), equalTo(3));
-        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Collections.singleton("store")));
-        assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007"), equalTo(Collections.singleton("store")));
+        final ProcessorTopology topology =
+            builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+
+        assertThat(
+            topology.stateStores().size(),
+            equalTo(3));
+        assertThat(
+            topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"),
+            equalTo(Collections.singleton("store")));
+        assertThat(
+            topology.processorConnectedStateStores("KTABLE-MERGE-0000000007"),
+            equalTo(Collections.singleton("store")));
     }
 
     @Test
@@ -150,11 +209,18 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
-
-        assertThat(topology.stateStores().size(), equalTo(1));
-        assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
-        assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+        final ProcessorTopology topology =
+            builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+
+        assertThat(
+            topology.stateStores().size(),
+            equalTo(1));
+        assertThat(
+            topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"),
+            equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+        assertThat(
+            topology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"),
+            equalTo(Collections.singleton(topology.stateStores().get(0).name())));
     }
 
     @Test
@@ -163,16 +229,17 @@ public class StreamsBuilderTest {
         source.to("topic-sink");
 
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
-
         source.process(processorSupplier);
 
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
-            final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
             driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
         }
 
         // no exception was thrown
-        assertEquals(asList("A:aa"), processorSupplier.theCapturedProcessor().processed);
+        assertEquals(Collections.singletonList("A:aa"), processorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
@@ -181,18 +248,20 @@ public class StreamsBuilderTest {
         final KStream<String, String> through = source.through("topic-sink");
 
         final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
-        final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
-
         source.process(sourceProcessorSupplier);
+
+        final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
         through.process(throughProcessorSupplier);
 
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
-            final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
             driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
         }
 
-        assertEquals(asList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
-        assertEquals(asList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
+        assertEquals(Collections.singletonList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
+        assertEquals(Collections.singletonList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
     }
     
     @Test
@@ -207,7 +276,9 @@ public class StreamsBuilderTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
             driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
@@ -222,18 +293,15 @@ public class StreamsBuilderTest {
     public void shouldUseSerdesDefinedInMaterializedToConsumeTable() {
         final Map<Long, String> results = new HashMap<>();
         final String topic = "topic";
-        final ForeachAction<Long, String> action = new ForeachAction<Long, String>() {
-            @Override
-            public void apply(final Long key, final String value) {
-                results.put(key, value);
-            }
-        };
+        final ForeachAction<Long, String> action = results::put;
         builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")
                 .withKeySerde(Serdes.Long())
                 .withValueSerde(Serdes.String()))
                 .toStream().foreach(action);
 
-        final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
+        final ConsumerRecordFactory<Long, String> recordFactory =
+            new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
+
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
             driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
@@ -253,7 +321,9 @@ public class StreamsBuilderTest {
                 .withKeySerde(Serdes.Long())
                 .withValueSerde(Serdes.String()));
 
-        final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
+        final ConsumerRecordFactory<Long, String> recordFactory =
+            new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
+
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
             driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
@@ -269,7 +339,8 @@ public class StreamsBuilderTest {
         final String topic = "topic";
         builder.table(topic, Materialized.with(Serdes.Long(), Serdes.String()));
 
-        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        final ProcessorTopology topology =
+            builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(0));
     }
@@ -285,13 +356,18 @@ public class StreamsBuilderTest {
         final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
         internalTopologyBuilder.rewriteTopology(new StreamsConfig(props));
 
-        assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "topic")));
-
-        assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store")));
-
-        assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(), equalTo(false));
-
-        assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(), equalTo(true));
+        assertThat(
+            internalTopologyBuilder.build().storeToChangelogTopic(),
+            equalTo(Collections.singletonMap("store", "topic")));
+        assertThat(
+            internalTopologyBuilder.getStateStores().keySet(),
+            equalTo(Collections.singleton("store")));
+        assertThat(
+            internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
+            equalTo(false));
+        assertThat(
+            internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
+            equalTo(true));
     }
 
     @Test
@@ -302,24 +378,29 @@ public class StreamsBuilderTest {
         final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId("appId");
 
-        assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "appId-store-changelog")));
-
-        assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store")));
-
-        assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(), equalTo(true));
-
-        assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(), equalTo(Collections.singleton("appId-store-changelog")));
+        assertThat(
+            internalTopologyBuilder.build().storeToChangelogTopic(),
+            equalTo(Collections.singletonMap("store", "appId-store-changelog")));
+        assertThat(
+            internalTopologyBuilder.getStateStores().keySet(),
+            equalTo(Collections.singleton("store")));
+        assertThat(
+            internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
+            equalTo(true));
+        assertThat(
+            internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
+            equalTo(Collections.singleton("appId-store-changelog")));
     }
     
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() {
-        builder.stream(Collections.<String>emptyList());
+        builder.stream(Collections.emptyList());
         builder.build();
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowExceptionWhenTopicNamesAreNull() {
-        builder.stream(Arrays.<String>asList(null, null));
+        builder.stream(Arrays.asList(null, null));
         builder.build();
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 046ffb0..7ea9b2b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -374,8 +374,8 @@ public class TopologyTest {
 
     @Test
     public void sinkShouldReturnNullTopicWithDynamicRouting() {
-        final TopologyDescription.Sink expectedSinkNode
-                = new InternalTopologyBuilder.Sink("sink", (key, value, record) -> record.topic() + "-" + key);
+        final TopologyDescription.Sink expectedSinkNode =
+            new InternalTopologyBuilder.Sink("sink", (key, value, record) -> record.topic() + "-" + key);
 
         assertThat(expectedSinkNode.topic(), equalTo(null));
     }
@@ -383,8 +383,8 @@ public class TopologyTest {
     @Test
     public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
         final TopicNameExtractor topicNameExtractor = (key, value, record) -> record.topic() + "-" + key;
-        final TopologyDescription.Sink expectedSinkNode
-                = new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
+        final TopologyDescription.Sink expectedSinkNode =
+            new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
 
         assertThat(expectedSinkNode.topicNameExtractor(), equalTo(topicNameExtractor));
     }
@@ -459,8 +459,8 @@ public class TopologyTest {
     public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
         final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
         final String[] store = new String[] {"store"};
-        final TopologyDescription.Processor expectedProcessorNode
-            = addProcessorWithNewStore("processor", store, expectedSourceNode);
+        final TopologyDescription.Processor expectedProcessorNode =
+            addProcessorWithNewStore("processor", store, expectedSourceNode);
 
         final Set<TopologyDescription.Node> allNodes = new HashSet<>();
         allNodes.add(expectedSourceNode);
@@ -475,8 +475,8 @@ public class TopologyTest {
     public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
         final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
         final String[] stores = new String[] {"store1", "store2"};
-        final TopologyDescription.Processor expectedProcessorNode
-            = addProcessorWithNewStore("processor", stores, expectedSourceNode);
+        final TopologyDescription.Processor expectedProcessorNode =
+            addProcessorWithNewStore("processor", stores, expectedSourceNode);
 
         final Set<TopologyDescription.Node> allNodes = new HashSet<>();
         allNodes.add(expectedSourceNode);
@@ -612,16 +612,16 @@ public class TopologyTest {
         final String[] bothStores = new String[] {store1[0], store2[0]};
 
         final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
-        final TopologyDescription.Processor expectedProcessorNode1
-            = addProcessorWithNewStore("processor1", store1, expectedSourceNode1);
+        final TopologyDescription.Processor expectedProcessorNode1 =
+            addProcessorWithNewStore("processor1", store1, expectedSourceNode1);
 
         final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        final TopologyDescription.Processor expectedProcessorNode2
-            = addProcessorWithNewStore("processor2", store2, expectedSourceNode2);
+        final TopologyDescription.Processor expectedProcessorNode2 =
+            addProcessorWithNewStore("processor2", store2, expectedSourceNode2);
 
         final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        final TopologyDescription.Processor expectedProcessorNode3
-            = addProcessorWithExistingStore("processor3", bothStores, expectedSourceNode3);
+        final TopologyDescription.Processor expectedProcessorNode3 =
+            addProcessorWithExistingStore("processor3", bothStores, expectedSourceNode3);
 
         final Set<TopologyDescription.Node> allNodes = new HashSet<>();
         allNodes.add(expectedSourceNode1);
@@ -1138,8 +1138,8 @@ public class TopologyTest {
         } else {
             topology.connectProcessorAndStateStores(processorName, storeNames);
         }
-        final TopologyDescription.Processor expectedProcessorNode
-            = new InternalTopologyBuilder.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
+        final TopologyDescription.Processor expectedProcessorNode =
+            new InternalTopologyBuilder.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
 
         for (final TopologyDescription.Node parent : parents) {
             ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedProcessorNode);
@@ -1158,8 +1158,8 @@ public class TopologyTest {
         }
 
         topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
-        final TopologyDescription.Sink expectedSinkNode
-            = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
+        final TopologyDescription.Sink expectedSinkNode =
+            new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
 
         for (final TopologyDescription.Node parent : parents) {
             ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedSinkNode);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index b2bd0a8..bdfbb3b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -185,21 +185,21 @@ public class EosIntegrationTest {
                     CLUSTER.time
                 );
 
-                final List<KeyValue<Long, Long>> committedRecords
-                    = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                    TestUtils.consumerConfig(
-                        CLUSTER.bootstrapServers(),
-                        CONSUMER_GROUP_ID,
-                        LongDeserializer.class,
-                        LongDeserializer.class,
-                        new Properties() {
-                            {
-                                put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
-                            }
-                        }),
-                    outputTopic,
-                    inputData.size()
-                );
+                final List<KeyValue<Long, Long>> committedRecords =
+                    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                        TestUtils.consumerConfig(
+                            CLUSTER.bootstrapServers(),
+                            CONSUMER_GROUP_ID,
+                            LongDeserializer.class,
+                            LongDeserializer.class,
+                            new Properties() {
+                                {
+                                    put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+                                }
+                            }),
+                        outputTopic,
+                        inputData.size()
+                    );
 
                 checkResultPerKey(committedRecords, inputData);
             } finally {
@@ -273,21 +273,21 @@ public class EosIntegrationTest {
                 CLUSTER.time
             );
 
-            final List<KeyValue<Long, Long>> firstCommittedRecords
-                = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                TestUtils.consumerConfig(
-                    CLUSTER.bootstrapServers(),
-                    CONSUMER_GROUP_ID,
-                    LongDeserializer.class,
-                    LongDeserializer.class,
-                    new Properties() {
-                        {
-                            put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
-                        }
-                    }),
-                SINGLE_PARTITION_OUTPUT_TOPIC,
-                firstBurstOfData.size()
-            );
+            final List<KeyValue<Long, Long>> firstCommittedRecords =
+                IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                    TestUtils.consumerConfig(
+                        CLUSTER.bootstrapServers(),
+                        CONSUMER_GROUP_ID,
+                        LongDeserializer.class,
+                        LongDeserializer.class,
+                        new Properties() {
+                            {
+                                put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+                            }
+                        }),
+                    SINGLE_PARTITION_OUTPUT_TOPIC,
+                    firstBurstOfData.size()
+                );
 
             assertThat(firstCommittedRecords, equalTo(firstBurstOfData));
 
@@ -298,21 +298,21 @@ public class EosIntegrationTest {
                 CLUSTER.time
             );
 
-            final List<KeyValue<Long, Long>> secondCommittedRecords
-                = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                TestUtils.consumerConfig(
-                    CLUSTER.bootstrapServers(),
-                    CONSUMER_GROUP_ID,
-                    LongDeserializer.class,
-                    LongDeserializer.class,
-                    new Properties() {
-                        {
-                            put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
-                        }
-                    }),
-                SINGLE_PARTITION_OUTPUT_TOPIC,
-                secondBurstOfData.size()
-            );
+            final List<KeyValue<Long, Long>> secondCommittedRecords =
+                IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                    TestUtils.consumerConfig(
+                        CLUSTER.bootstrapServers(),
+                        CONSUMER_GROUP_ID,
+                        LongDeserializer.class,
+                        LongDeserializer.class,
+                        new Properties() {
+                            {
+                                put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+                            }
+                        }),
+                    SINGLE_PARTITION_OUTPUT_TOPIC,
+                    secondBurstOfData.size()
+                );
 
             assertThat(secondCommittedRecords, equalTo(secondBurstOfData));
         } finally {
@@ -593,9 +593,9 @@ public class EosIntegrationTest {
         String[] storeNames = null;
         if (withState) {
             storeNames = new String[] {storeName};
-            final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder
-                    = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.Long(), Serdes.Long())
-                    .withCachingEnabled();
+            final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder = Stores
+                .keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.Long(), Serdes.Long())
+                .withCachingEnabled();
 
             builder.addStateStore(storeBuilder);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 04cc0e1..3ba8f09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -628,8 +628,8 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
         latch.await(30, TimeUnit.SECONDS);
-        final ReadOnlySessionStore<String, String> sessionStore
-                = kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore());
+        final ReadOnlySessionStore<String, String> sessionStore =
+            kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore());
 
         // verify correct data received
         assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index f2fc4f8..9bdea13 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -22,20 +22,15 @@ import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Merger;
-import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -43,7 +38,6 @@ import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
@@ -76,14 +70,14 @@ public class KGroupedStreamImplTest {
     private final StreamsBuilder builder = new StreamsBuilder();
     private KGroupedStream<String, String> groupedStream;
 
-    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final ConsumerRecordFactory<String, String> recordFactory =
+        new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Before
     public void before() {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
         groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
-
     }
 
     @Test(expected = NullPointerException.class)
@@ -93,12 +87,14 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = InvalidTopicException.class)
     public void shouldNotHaveInvalidStoreNameOnReduce() {
-        groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+        groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullReducerWithWindowedReduce() {
-        groupedStream.windowedBy(TimeWindows.of(ofMillis(10))).reduce(null, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+        groupedStream
+            .windowedBy(TimeWindows.of(ofMillis(10)))
+            .reduce(null, Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
@@ -108,32 +104,41 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = InvalidTopicException.class)
     public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
-        groupedStream.windowedBy(TimeWindows.of(ofMillis(10))).reduce(MockReducer.STRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+        groupedStream
+            .windowedBy(TimeWindows.of(ofMillis(10)))
+            .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullInitializerOnAggregate() {
-        groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullAdderOnAggregate() {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store"));
     }
 
     @Test(expected = InvalidTopicException.class)
     public void shouldNotHaveInvalidStoreNameOnAggregate() {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+        groupedStream.aggregate(
+            MockInitializer.STRING_INIT,
+            MockAggregator.TOSTRING_ADDER,
+            Materialized.as(INVALID_STORE_NAME));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullInitializerOnWindowedAggregate() {
-        groupedStream.windowedBy(TimeWindows.of(ofMillis(10))).aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+        groupedStream
+            .windowedBy(TimeWindows.of(ofMillis(10)))
+            .aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullAdderOnWindowedAggregate() {
-        groupedStream.windowedBy(TimeWindows.of(ofMillis(10))).aggregate(MockInitializer.STRING_INIT, null, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+        groupedStream
+            .windowedBy(TimeWindows.of(ofMillis(10)))
+            .aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
@@ -143,7 +148,9 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = InvalidTopicException.class)
     public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
-        groupedStream.windowedBy(TimeWindows.of(ofMillis(10))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+        groupedStream
+            .windowedBy(TimeWindows.of(ofMillis(10)))
+            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
     }
 
     private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) {
@@ -163,28 +170,16 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldAggregateSessionWindows() {
         final Map<Windowed<String>, Integer> results = new HashMap<>();
-        final KTable<Windowed<String>, Integer> table = groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).aggregate(new Initializer<Integer>() {
-            @Override
-            public Integer apply() {
-                return 0;
-            }
-        }, new Aggregator<String, String, Integer>() {
-            @Override
-            public Integer apply(final String aggKey, final String value, final Integer aggregate) {
-                return aggregate + 1;
-            }
-        }, new Merger<String, Integer>() {
-            @Override
-            public Integer apply(final String aggKey, final Integer aggOne, final Integer aggTwo) {
-                return aggOne + aggTwo;
-            }
-        }, Materialized.<String, Integer, SessionStore<Bytes, byte[]>>as("session-store").withValueSerde(Serdes.Integer()));
-        table.toStream().foreach(new ForeachAction<Windowed<String>, Integer>() {
-            @Override
-            public void apply(final Windowed<String> key, final Integer value) {
-                results.put(key, value);
-            }
-        });
+        final KTable<Windowed<String>, Integer> table = groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .aggregate(
+                () -> 0,
+                (aggKey, value, aggregate) -> aggregate + 1,
+                (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+                Materialized
+                    .<String, Integer, SessionStore<Bytes, byte[]>>as("session-store").
+                    withValueSerde(Serdes.Integer()));
+        table.toStream().foreach(results::put);
 
         doAggregateSessionWindows(results);
         assertEquals(table.queryableStoreName(), "session-store");
@@ -193,28 +188,14 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldAggregateSessionWindowsWithInternalStoreName() {
         final Map<Windowed<String>, Integer> results = new HashMap<>();
-        final KTable<Windowed<String>, Integer> table = groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).aggregate(new Initializer<Integer>() {
-            @Override
-            public Integer apply() {
-                return 0;
-            }
-        }, new Aggregator<String, String, Integer>() {
-            @Override
-            public Integer apply(final String aggKey, final String value, final Integer aggregate) {
-                return aggregate + 1;
-            }
-        }, new Merger<String, Integer>() {
-            @Override
-            public Integer apply(final String aggKey, final Integer aggOne, final Integer aggTwo) {
-                return aggOne + aggTwo;
-            }
-        }, Materialized.<String, Integer, SessionStore<Bytes, byte[]>>with(null, Serdes.Integer()));
-        table.toStream().foreach(new ForeachAction<Windowed<String>, Integer>() {
-            @Override
-            public void apply(final Windowed<String> key, final Integer value) {
-                results.put(key, value);
-            }
-        });
+        final KTable<Windowed<String>, Integer> table = groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .aggregate(
+                () -> 0,
+                (aggKey, value, aggregate) -> aggregate + 1,
+                (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+                Materialized.with(null, Serdes.Integer()));
+        table.toStream().foreach(results::put);
 
         doAggregateSessionWindows(results);
     }
@@ -236,14 +217,10 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldCountSessionWindows() {
         final Map<Windowed<String>, Long> results = new HashMap<>();
-        final KTable<Windowed<String>, Long> table = groupedStream.windowedBy(SessionWindows.with(ofMillis(30)))
-                .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("session-store"));
-        table.toStream().foreach(new ForeachAction<Windowed<String>, Long>() {
-            @Override
-            public void apply(final Windowed<String> key, final Long value) {
-                results.put(key, value);
-            }
-        });
+        final KTable<Windowed<String>, Long> table = groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .count(Materialized.as("session-store"));
+        table.toStream().foreach(results::put);
         doCountSessionWindows(results);
         assertEquals(table.queryableStoreName(), "session-store");
     }
@@ -251,13 +228,10 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldCountSessionWindowsWithInternalStoreName() {
         final Map<Windowed<String>, Long> results = new HashMap<>();
-        final KTable<Windowed<String>, Long> table = groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).count();
-        table.toStream().foreach(new ForeachAction<Windowed<String>, Long>() {
-            @Override
-            public void apply(final Windowed<String> key, final Long value) {
-                results.put(key, value);
-            }
-        });
+        final KTable<Windowed<String>, Long> table = groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .count();
+        table.toStream().foreach(results::put);
         doCountSessionWindows(results);
         assertNull(table.queryableStoreName());
     }
@@ -279,19 +253,10 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldReduceSessionWindows() {
         final Map<Windowed<String>, String> results = new HashMap<>();
-        final KTable<Windowed<String>, String> table = groupedStream.windowedBy(SessionWindows.with(ofMillis(30)))
-                .reduce(new Reducer<String>() {
-                    @Override
-                    public String apply(final String value1, final String value2) {
-                        return value1 + ":" + value2;
-                    }
-                }, Materialized.<String, String, SessionStore<Bytes, byte[]>>as("session-store"));
-        table.toStream().foreach(new ForeachAction<Windowed<String>, String>() {
-            @Override
-            public void apply(final Windowed<String> key, final String value) {
-                results.put(key, value);
-            }
-        });
+        final KTable<Windowed<String>, String> table = groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as("session-store"));
+        table.toStream().foreach(results::put);
         doReduceSessionWindows(results);
         assertEquals(table.queryableStoreName(), "session-store");
     }
@@ -299,26 +264,19 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldReduceSessionWindowsWithInternalStoreName() {
         final Map<Windowed<String>, String> results = new HashMap<>();
-        final KTable<Windowed<String>, String> table = groupedStream.windowedBy(SessionWindows.with(ofMillis(30)))
-                .reduce(new Reducer<String>() {
-                    @Override
-                    public String apply(final String value1, final String value2) {
-                        return value1 + ":" + value2;
-                    }
-                });
-        table.toStream().foreach(new ForeachAction<Windowed<String>, String>() {
-            @Override
-            public void apply(final Windowed<String> key, final String value) {
-                results.put(key, value);
-            }
-        });
+        final KTable<Windowed<String>, String> table = groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .reduce((value1, value2) -> value1 + ":" + value2);
+        table.toStream().foreach(results::put);
         doReduceSessionWindows(results);
         assertNull(table.queryableStoreName());
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
-        groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).reduce(null, Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+        groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .reduce(null, Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
@@ -328,39 +286,51 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = InvalidTopicException.class)
     public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
-        groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).reduce(MockReducer.STRING_ADDER, Materialized.<String, String, SessionStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+        groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
-        groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).reduce(null, Materialized.<String, String, SessionStore<Bytes, byte[]>>as(null));
+        groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .reduce(
+                null,
+                Materialized.<String, String, SessionStore<Bytes, byte[]>>as(null));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
-        groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).aggregate(null, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
-            @Override
-            public String apply(final String aggKey, final String aggOne, final String aggTwo) {
-                return null;
-            }
-        }, Materialized.<String, String, SessionStore<Bytes, byte[]>>as("storeName"));
+        groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .aggregate(
+                null,
+                MockAggregator.TOSTRING_ADDER,
+                (aggKey, aggOne, aggTwo) -> null,
+                Materialized.as("storeName"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
-        groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).aggregate(MockInitializer.STRING_INIT, null, new Merger<String, String>() {
-            @Override
-            public String apply(final String aggKey, final String aggOne, final String aggTwo) {
-                return null;
-            }
-        }, Materialized.<String, String, SessionStore<Bytes, byte[]>>as("storeName"));
+        groupedStream.
+            windowedBy(SessionWindows.with(ofMillis(30)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                null,
+                (aggKey, aggOne, aggTwo) -> null,
+                Materialized.as("storeName"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
-        groupedStream.windowedBy(SessionWindows.with(ofMillis(30))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
+        groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
                 null,
-                Materialized.<String, String, SessionStore<Bytes, byte[]>>as("storeName"));
+                Materialized.as("storeName"));
     }
 
     @Test(expected = NullPointerException.class)
@@ -370,24 +340,24 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
-        groupedStream.windowedBy(SessionWindows.with(ofMillis(10)))
-                .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
-                    @Override
-                    public String apply(final String aggKey, final String aggOne, final String aggTwo) {
-                        return null;
-                    }
-                }, Materialized.<String, String, SessionStore<Bytes, byte[]>>with(Serdes.String(), Serdes.String()));
+        groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(10)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                (aggKey, aggOne, aggTwo) -> null,
+                Materialized.with(Serdes.String(), Serdes.String()));
     }
 
     @Test(expected = InvalidTopicException.class)
     public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
-        groupedStream.windowedBy(SessionWindows.with(ofMillis(10)))
-                .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
-                    @Override
-                    public String apply(final String aggKey, final String aggOne, final String aggTwo) {
-                        return null;
-                    }
-                }, Materialized.<String, String, SessionStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+        groupedStream
+            .windowedBy(SessionWindows.with(ofMillis(10)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                (aggKey, aggOne, aggTwo) -> null,
+                Materialized.as(INVALID_STORE_NAME));
     }
 
     @SuppressWarnings("unchecked")
@@ -506,16 +476,10 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldAggregateWithDefaultSerdes() {
         final Map<String, String> results = new HashMap<>();
-        groupedStream.aggregate(
-            MockInitializer.STRING_INIT,
-            MockAggregator.TOSTRING_ADDER)
+        groupedStream
+            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER)
             .toStream()
-            .foreach(new ForeachAction<String, String>() {
-                @Override
-                public void apply(final String key, final String value) {
-                    results.put(key, value);
-                }
-            });
+            .foreach(results::put);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             processData(driver);
@@ -560,14 +524,11 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldCountWindowed() {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
-        groupedStream.windowedBy(TimeWindows.of(ofMillis(500L))).count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("aggregate-by-key-windowed"))
+        groupedStream
+            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .count(Materialized.as("aggregate-by-key-windowed"))
             .toStream()
-            .foreach(new ForeachAction<Windowed<String>, Long>() {
-                @Override
-                public void apply(final Windowed<String> key, final Long value) {
-                    results.add(KeyValue.pair(key, value));
-                }
-            });
+            .foreach((key, value) -> results.add(KeyValue.pair(key, value)));
 
         doCountWindowed(results);
     }
@@ -575,14 +536,11 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldCountWindowedWithInternalStoreName() {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
-        groupedStream.windowedBy(TimeWindows.of(ofMillis(500L))).count()
+        groupedStream
+            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .count()
             .toStream()
-            .foreach(new ForeachAction<Windowed<String>, Long>() {
-                @Override
-                public void apply(final Windowed<String> key, final Long value) {
-                    results.add(KeyValue.pair(key, value));
-                }
-            });
+            .foreach((key, value) -> results.add(KeyValue.pair(key, value)));
 
         doCountWindowed(results);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 99f1b81..09f93e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -21,11 +21,10 @@ import org.apache.kafka.common.serialization.DoubleSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KTable;
@@ -50,7 +49,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
-
 public class KGroupedTableImplTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
@@ -61,57 +59,84 @@ public class KGroupedTableImplTest {
 
     @Before
     public void before() {
-        groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
-                .groupBy(MockMapper.<String, String>selectValueKeyValueMapper());
+        groupedTable = builder
+            .table("blah", Consumed.with(Serdes.String(), Serdes.String()))
+            .groupBy(MockMapper.selectValueKeyValueMapper());
     }
 
     @Test(expected = InvalidTopicException.class)
     public void shouldNotAllowInvalidStoreNameOnAggregate() {
-        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+        groupedTable.aggregate(
+            MockInitializer.STRING_INIT,
+            MockAggregator.TOSTRING_ADDER,
+            MockAggregator.TOSTRING_REMOVER,
+            Materialized.as(INVALID_STORE_NAME));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullInitializerOnAggregate() {
-        groupedTable.aggregate(null, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedTable.aggregate(
+            null,
+            MockAggregator.TOSTRING_ADDER,
+            MockAggregator.TOSTRING_REMOVER,
+            Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullAdderOnAggregate() {
-        groupedTable.aggregate(MockInitializer.STRING_INIT, null, MockAggregator.TOSTRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedTable.aggregate(
+            MockInitializer.STRING_INIT,
+            null,
+            MockAggregator.TOSTRING_REMOVER,
+            Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullSubtractorOnAggregate() {
-        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedTable.aggregate(
+            MockInitializer.STRING_INIT,
+            MockAggregator.TOSTRING_ADDER,
+            null,
+            Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullAdderOnReduce() {
-        groupedTable.reduce(null, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedTable.reduce(
+            null,
+            MockReducer.STRING_REMOVER,
+            Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullSubtractorOnReduce() {
-        groupedTable.reduce(MockReducer.STRING_ADDER, null, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedTable.reduce(
+            MockReducer.STRING_ADDER,
+            null,
+            Materialized.as("store"));
     }
 
     @Test(expected = InvalidTopicException.class)
     public void shouldNotAllowInvalidStoreNameOnReduce() {
-        groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(INVALID_STORE_NAME));
+        groupedTable.reduce(
+            MockReducer.STRING_ADDER,
+            MockReducer.STRING_REMOVER,
+            Materialized.as(INVALID_STORE_NAME));
     }
 
     private Map<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
         final Map<String, Integer> reducedResults = new HashMap<>();
-        inputKTable.toStream().foreach(new ForeachAction<String, Integer>() {
-            @Override
-            public void apply(final String key, final Integer value) {
-                reducedResults.put(key, value);
-            }
-        });
+        inputKTable
+            .toStream()
+            .foreach(reducedResults::put);
         return reducedResults;
     }
-    private void assertReduced(final Map<String, Integer> reducedResults, final String topic, final TopologyTestDriver driver) {
-        final ConsumerRecordFactory<String, Double> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
+
+    private void assertReduced(final Map<String, Integer> reducedResults,
+                               final String topic,
+                               final TopologyTestDriver driver) {
+        final ConsumerRecordFactory<String, Double> recordFactory =
+            new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
         driver.pipeInput(recordFactory.create(topic, "A", 1.1, 10));
         driver.pipeInput(recordFactory.create(topic, "B", 2.2, 10));
 
@@ -130,20 +155,20 @@ public class KGroupedTableImplTest {
     @Test
     public void shouldReduce() {
         final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
-            new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
-                @Override
-                public KeyValue<String, Integer> apply(final String key, final Number value) {
-                    return KeyValue.pair(key, value.intValue());
-                }
-            };
-
-        final KTable<String, Integer> reduced = builder.table(topic,
-                                                              Consumed.with(Serdes.String(), Serdes.Double()),
-                                                              Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
-                                                                      .withKeySerde(Serdes.String())
-                                                                      .withValueSerde(Serdes.Double()))
+            (key, value) -> KeyValue.pair(key, value.intValue());
+
+        final KTable<String, Integer> reduced = builder
+            .table(
+                topic,
+                Consumed.with(Serdes.String(), Serdes.Double()),
+                Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.Double()))
             .groupBy(intProjection)
-            .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("reduced"));
+            .reduce(
+                MockReducer.INTEGER_ADDER,
+                MockReducer.INTEGER_SUBTRACTOR,
+                Materialized.as("reduced"));
 
         final Map<String, Integer> results = getReducedResults(reduced);
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -155,18 +180,15 @@ public class KGroupedTableImplTest {
     @Test
     public void shouldReduceWithInternalStoreName() {
         final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
-            new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
-                @Override
-                public KeyValue<String, Integer> apply(final String key, final Number value) {
-                    return KeyValue.pair(key, value.intValue());
-                }
-            };
-
-        final KTable<String, Integer> reduced = builder.table(topic,
-                                                              Consumed.with(Serdes.String(), Serdes.Double()),
-                                                              Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
-                                                                      .withKeySerde(Serdes.String())
-                                                                      .withValueSerde(Serdes.Double()))
+            (key, value) -> KeyValue.pair(key, value.intValue());
+
+        final KTable<String, Integer> reduced = builder
+            .table(
+                topic,
+                Consumed.with(Serdes.String(), Serdes.Double()),
+                Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.Double()))
             .groupBy(intProjection)
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
 
@@ -181,20 +203,19 @@ public class KGroupedTableImplTest {
     @Test
     public void shouldReduceAndMaterializeResults() {
         final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
-            new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
-                @Override
-                public KeyValue<String, Integer> apply(final String key, final Number value) {
-                    return KeyValue.pair(key, value.intValue());
-                }
-            };
-
-        final KTable<String, Integer> reduced = builder.table(topic, Consumed.with(Serdes.String(), Serdes.Double()))
-                .groupBy(intProjection)
-                .reduce(MockReducer.INTEGER_ADDER,
-                        MockReducer.INTEGER_SUBTRACTOR,
-                        Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("reduce")
-                                .withKeySerde(Serdes.String())
-                                .withValueSerde(Serdes.Integer()));
+            (key, value) -> KeyValue.pair(key, value.intValue());
+
+        final KTable<String, Integer> reduced = builder
+            .table(
+                topic,
+                Consumed.with(Serdes.String(), Serdes.Double()))
+            .groupBy(intProjection)
+            .reduce(
+                MockReducer.INTEGER_ADDER,
+                MockReducer.INTEGER_SUBTRACTOR,
+                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("reduce")
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.Integer()));
 
         final Map<String, Integer> results = getReducedResults(reduced);
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -208,11 +229,17 @@ public class KGroupedTableImplTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldCountAndMaterializeResults() {
-        final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
-        table.groupBy(MockMapper.selectValueKeyValueMapper(), Grouped.with(Serdes.String(), Serdes.String()))
-                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
-                               .withKeySerde(Serdes.String())
-                               .withValueSerde(Serdes.Long()));
+        builder
+            .table(
+                topic,
+                Consumed.with(Serdes.String(), Serdes.String()))
+            .groupBy(
+                MockMapper.selectValueKeyValueMapper(),
+                Grouped.with(Serdes.String(), Serdes.String()))
+            .count(
+                Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.Long()));
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             processData(topic, driver);
@@ -225,14 +252,20 @@ public class KGroupedTableImplTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldAggregateAndMaterializeResults() {
-        final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
-        table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), Grouped.with(Serdes.String(), Serdes.String()))
-                .aggregate(MockInitializer.STRING_INIT,
-                           MockAggregator.TOSTRING_ADDER,
-                           MockAggregator.TOSTRING_REMOVER,
-                           Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate")
-                                   .withValueSerde(Serdes.String())
-                                   .withKeySerde(Serdes.String()));
+        builder
+            .table(
+                topic,
+                Consumed.with(Serdes.String(), Serdes.String()))
+            .groupBy(
+                MockMapper.selectValueKeyValueMapper(),
+                Grouped.with(Serdes.String(), Serdes.String()))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                MockAggregator.TOSTRING_REMOVER,
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate")
+                    .withValueSerde(Serdes.String())
+                    .withKeySerde(Serdes.String()));
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             processData(topic, driver);
@@ -251,54 +284,69 @@ public class KGroupedTableImplTest {
     @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
-        groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (Materialized) null);
+        groupedTable.reduce(
+            MockReducer.STRING_ADDER,
+            MockReducer.STRING_REMOVER,
+            (Materialized) null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnReduceWhenAdderIsNull() {
-        groupedTable.reduce(null, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedTable.reduce(
+            null,
+            MockReducer.STRING_REMOVER,
+            Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnReduceWhenSubtractorIsNull() {
-        groupedTable.reduce(MockReducer.STRING_ADDER, null, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedTable.reduce(
+            MockReducer.STRING_ADDER,
+            null,
+            Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnAggregateWhenInitializerIsNull() {
-        groupedTable.aggregate(null,
-                               MockAggregator.TOSTRING_ADDER,
-                               MockAggregator.TOSTRING_REMOVER,
-                               Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedTable.aggregate(
+            null,
+            MockAggregator.TOSTRING_ADDER,
+            MockAggregator.TOSTRING_REMOVER,
+            Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnAggregateWhenAdderIsNull() {
-        groupedTable.aggregate(MockInitializer.STRING_INIT,
-                               null,
-                               MockAggregator.TOSTRING_REMOVER,
-                               Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedTable.aggregate(
+            MockInitializer.STRING_INIT,
+            null,
+            MockAggregator.TOSTRING_REMOVER,
+            Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnAggregateWhenSubtractorIsNull() {
-        groupedTable.aggregate(MockInitializer.STRING_INIT,
-                               MockAggregator.TOSTRING_ADDER,
-                               null,
-                               Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        groupedTable.aggregate(
+            MockInitializer.STRING_INIT,
+            MockAggregator.TOSTRING_ADDER,
+            null,
+            Materialized.as("store"));
     }
 
     @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
-        groupedTable.aggregate(MockInitializer.STRING_INIT,
-                               MockAggregator.TOSTRING_ADDER,
-                               MockAggregator.TOSTRING_REMOVER,
-                               (Materialized) null);
+        groupedTable.aggregate(
+            MockInitializer.STRING_INIT,
+            MockAggregator.TOSTRING_ADDER,
+            MockAggregator.TOSTRING_REMOVER,
+            (Materialized) null);
     }
 
-    private void processData(final String topic, final TopologyTestDriver driver) {
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private void processData(final String topic,
+                             final TopologyTestDriver driver) {
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         driver.pipeInput(recordFactory.create(topic, "A", "1"));
         driver.pipeInput(recordFactory.create(topic, "B", "1"));
         driver.pipeInput(recordFactory.create(topic, "C", "1"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index f2e3cc9..133bd55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -19,14 +19,13 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessor;
@@ -53,7 +52,8 @@ public class KStreamKStreamJoinTest {
     final private String topic2 = "topic2";
 
     private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
-    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private final ConsumerRecordFactory<Integer, String> recordFactory =
+        new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
@@ -62,16 +62,12 @@ public class KStreamKStreamJoinTest {
 
         final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
         final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
-        final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
+        final ConsumerRecordFactory<String, Integer> recordFactory =
+            new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
 
         left.join(
             right,
-            new ValueJoiner<Integer, Integer, Integer>() {
-                @Override
-                public Integer apply(final Integer value1, final Integer value2) {
-                    return value1 + value2;
-                }
-            },
+            (value1, value2) -> value1 + value2,
             JoinWindows.of(ofMillis(100)),
             Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
         );
@@ -106,7 +102,8 @@ public class KStreamKStreamJoinTest {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+        final Collection<Set<String>> copartitionGroups =
+            TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -208,7 +205,8 @@ public class KStreamKStreamJoinTest {
             JoinWindows.of(ofMillis(100)),
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
-        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+        final Collection<Set<String>> copartitionGroups =
+            TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -313,7 +311,8 @@ public class KStreamKStreamJoinTest {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+        final Collection<Set<String>> copartitionGroups =
+            TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -536,7 +535,8 @@ public class KStreamKStreamJoinTest {
                 Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+        final Collection<Set<String>> copartitionGroups =
+            TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -645,7 +645,8 @@ public class KStreamKStreamJoinTest {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+        final Collection<Set<String>> copartitionGroups =
+            TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index ced57b2..9906556 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.easymock.EasyMock;
@@ -25,7 +24,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
 
@@ -34,23 +33,16 @@ import static org.junit.Assert.assertEquals;
 public class KStreamPrintTest {
 
     private ByteArrayOutputStream byteOutStream;
-
-    private KeyValueMapper<Integer, String, String> mapper;
-    private KStreamPrint kStreamPrint;
-    private Processor printProcessor;
+    private Processor<Integer, String> printProcessor;
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         byteOutStream = new ByteArrayOutputStream();
 
-        mapper = new KeyValueMapper<Integer, String, String>() {
-            @Override
-            public String apply(final Integer key, final String value) {
-                return String.format("%d, %s", key, value);
-            }
-        };
-
-        kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(byteOutStream, mapper, "test-stream"));
+        final KStreamPrint<Integer, String> kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(
+            byteOutStream,
+            (key, value) -> String.format("%d, %s", key, value),
+            "test-stream"));
 
         printProcessor = kStreamPrint.get();
         final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class);
@@ -62,33 +54,27 @@ public class KStreamPrintTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testPrintStreamWithProvidedKeyValueMapper() {
-
         final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
                 new KeyValue<>(0, "zero"),
                 new KeyValue<>(1, "one"),
                 new KeyValue<>(2, "two"),
                 new KeyValue<>(3, "three"));
 
-        final String[] expectedResult = {"[test-stream]: 0, zero", "[test-stream]: 1, one", "[test-stream]: 2, two", "[test-stream]: 3, three"};
+        final String[] expectedResult = {
+            "[test-stream]: 0, zero",
+            "[test-stream]: 1, one",
+            "[test-stream]: 2, two",
+            "[test-stream]: 3, three"};
 
-        doTest(inputRecords, expectedResult);
-    }
-
-    private void assertFlushData(final String[] expectedResult, final ByteArrayOutputStream byteOutStream) {
+        for (final KeyValue<Integer, String> record: inputRecords) {
+            printProcessor.process(record.key, record.value);
+        }
+        printProcessor.close();
 
-        final String[] flushOutDatas = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\\r*\\n");
+        final String[] flushOutDatas = new String(byteOutStream.toByteArray(), StandardCharsets.UTF_8).split("\\r*\\n");
         for (int i = 0; i < flushOutDatas.length; i++) {
             assertEquals(expectedResult[i], flushOutDatas[i]);
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private <K, V> void doTest(final List<KeyValue<K, V>> inputRecords, final String[] expectedResult) {
-
-        for (final KeyValue<K, V> record: inputRecords) {
-            printProcessor.process(record.key, record.value);
-        }
-        printProcessor.close();
-        assertFlushData(expectedResult, byteOutStream);
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 1074f02f..cd2031f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -108,10 +108,12 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     private void initStore(final boolean enableCaching) {
-        final StoreBuilder<SessionStore<String, Long>> storeBuilder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)),
-                                                                                                 Serdes.String(),
-                                                                                                 Serdes.Long())
-                                                                            .withLoggingDisabled();
+        final StoreBuilder<SessionStore<String, Long>> storeBuilder =
+            Stores.sessionStoreBuilder(
+                Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)),
+                Serdes.String(),
+                Serdes.Long())
+            .withLoggingDisabled();
 
         if (enableCaching) {
             storeBuilder.withCachingEnabled();
@@ -133,12 +135,12 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setTime(500);
         processor.process("john", "second");
 
-        final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions("john", 0, 2000);
+        final KeyValueIterator<Windowed<String>, Long> values =
+            sessionStore.findSessions("john", 0, 2000);
         assertTrue(values.hasNext());
         assertEquals(Long.valueOf(2), values.next().value);
     }
 
-
     @Test
     public void shouldMergeSessions() {
         context.setTime(0);
@@ -156,7 +158,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setTime(GAP_MS / 2);
         processor.process(sessionId, "third");
 
-        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions(sessionId, 0, GAP_MS + 1);
+        final KeyValueIterator<Windowed<String>, Long> iterator =
+            sessionStore.findSessions(sessionId, 0, GAP_MS + 1);
         final KeyValue<Windowed<String>, Long> kv = iterator.next();
 
         assertEquals(Long.valueOf(3), kv.value);
@@ -168,7 +171,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setTime(0);
         processor.process("mel", "first");
         processor.process("mel", "second");
-        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("mel", 0, 0);
+        final KeyValueIterator<Windowed<String>, Long> iterator =
+            sessionStore.findSessions("mel", 0, 0);
         assertEquals(Long.valueOf(2L), iterator.next().value);
         assertFalse(iterator.hasNext());
     }
@@ -199,21 +203,22 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     }
 
-
     @Test
     public void shouldRemoveMergedSessionsFromStateStore() {
         context.setTime(0);
         processor.process("a", "1");
 
         // first ensure it is in the store
-        final KeyValueIterator<Windowed<String>, Long> a1 = sessionStore.findSessions("a", 0, 0);
+        final KeyValueIterator<Windowed<String>, Long> a1 =
+            sessionStore.findSessions("a", 0, 0);
         assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), a1.next());
 
         context.setTime(100);
         processor.process("a", "2");
         // a1 from above should have been removed
         // should have merged session in store
-        final KeyValueIterator<Windowed<String>, Long> a2 = sessionStore.findSessions("a", 0, 100);
+        final KeyValueIterator<Windowed<String>, Long> a2 =
+            sessionStore.findSessions("a", 0, 100);
         assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 100)), 2L), a2.next());
         assertFalse(a2.hasNext());
     }
@@ -250,7 +255,6 @@ public class KStreamSessionWindowAggregateProcessorTest {
         );
     }
 
-
     @Test
     public void shouldGetAggregatedValuesFromValueGetter() {
         final KTableValueGetter<Windowed<String>, Long> getter = sessionAggregator.view().get();
@@ -315,8 +319,12 @@ public class KStreamSessionWindowAggregateProcessorTest {
         processor.process(null, "1");
         LogCaptureAppender.unregister(appender);
 
-        assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-3] offset=[-2]"));
+        assertEquals(
+            1.0,
+            getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(
+            appender.getMessages(),
+            hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-3] offset=[-2]"));
     }
 
     @Test
@@ -364,9 +372,14 @@ public class KStreamSessionWindowAggregateProcessorTest {
             )
         );
 
-        assertThat((Double) metrics.metrics().get(dropRate).metricValue(), greaterThan(0.0));
-
-        assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
-        assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
+        assertThat(
+            (Double) metrics.metrics().get(dropRate).metricValue(),
+            greaterThan(0.0));
+        assertThat(
+            appender.getMessages(),
+            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
+        assertThat(
+            appender.getMessages(),
+            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index dd39291..c8aef07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -71,17 +70,15 @@ public class KTableImplTest {
     private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
     private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
-    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final ConsumerRecordFactory<String, String> recordFactory =
+        new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final Serde<String> mySerde = new Serdes.StringSerde();
 
-    private StreamsBuilder builder;
     private KTable<String, String> table;
 
-    private Serde<String> mySerde = new Serdes.StringSerde();
-
     @Before
     public void setUp() {
-        builder = new StreamsBuilder();
-        table = builder.table("test");
+        table = new StreamsBuilder().table("test");
     }
 
     @Test
@@ -96,21 +93,11 @@ public class KTableImplTest {
         final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
         table1.toStream().process(supplier);
 
-        final KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
-            @Override
-            public Integer apply(final String value) {
-                return new Integer(value);
-            }
-        });
+        final KTable<String, Integer> table2 = table1.mapValues(Integer::new);
 
         table2.toStream().process(supplier);
 
-        final KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>() {
-            @Override
-            public boolean test(final String key, final Integer value) {
-                return (value % 2) == 0;
-            }
-        });
+        final KTable<String, Integer> table3 = table2.filter((key, value) -> (value % 2) == 0);
 
         table3.toStream().process(supplier);
 
@@ -142,63 +129,119 @@ public class KTableImplTest {
         final KeyValueMapper<String, String, String> selector = (key, value) -> key;
         final ValueMapper<String, String> mapper = value -> value;
         final ValueJoiner<String, String, String> joiner = (value1, value2) -> value1;
-        final ValueTransformerWithKeySupplier<String, String, String> valueTransformerWithKeySupplier = () -> new ValueTransformerWithKey<String, String, String>() {
-            @Override
-            public void init(final ProcessorContext context) {}
-
-            @Override
-            public String transform(final String key, final String value) {
-                return value;
-            }
-
-            @Override
-            public void close() {}
-        };
-
-        assertEquals(((AbstractStream) table1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde());
-        assertEquals(((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde());
-        assertEquals(((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
-
-        assertEquals(((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde());
-        assertEquals(((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde());
-        assertEquals(((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+        final ValueTransformerWithKeySupplier<String, String, String> valueTransformerWithKeySupplier =
+            () -> new ValueTransformerWithKey<String, String, String>() {
+                @Override
+                public void init(final ProcessorContext context) {}
+
+                @Override
+                public String transform(final String key, final String value) {
+                    return value;
+                }
 
-        assertEquals(((AbstractStream) table1.mapValues(mapper)).keySerde(), consumedInternal.keySerde());
+                @Override
+                public void close() {}
+            };
+
+        assertEquals(
+            ((AbstractStream) table1.filter((key, value) -> false)).keySerde(),
+            consumedInternal.keySerde());
+        assertEquals(
+            ((AbstractStream) table1.filter((key, value) -> false)).valueSerde(),
+            consumedInternal.valueSerde());
+        assertEquals(
+            ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(),
+            mySerde);
+        assertEquals(
+            ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(),
+            mySerde);
+
+        assertEquals(
+            ((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(),
+            consumedInternal.keySerde());
+        assertEquals(
+            ((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(),
+            consumedInternal.valueSerde());
+        assertEquals(
+            ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(),
+            mySerde);
+        assertEquals(
+            ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(),
+            mySerde);
+
+        assertEquals(
+            ((AbstractStream) table1.mapValues(mapper)).keySerde(),
+            consumedInternal.keySerde());
         assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde());
-        assertEquals(((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
-
-        assertEquals(((AbstractStream) table1.toStream()).keySerde(), consumedInternal.keySerde());
-        assertEquals(((AbstractStream) table1.toStream()).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(
+            ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(),
+            mySerde);
+        assertEquals(
+            ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(),
+            mySerde);
+
+        assertEquals(
+            ((AbstractStream) table1.toStream()).keySerde(),
+            consumedInternal.keySerde());
+        assertEquals(
+            ((AbstractStream) table1.toStream()).valueSerde(),
+            consumedInternal.valueSerde());
         assertNull(((AbstractStream) table1.toStream(selector)).keySerde());
-        assertEquals(((AbstractStream) table1.toStream(selector)).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(
+            ((AbstractStream) table1.toStream(selector)).valueSerde(),
+            consumedInternal.valueSerde());
 
-        assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), consumedInternal.keySerde());
+        assertEquals(
+            ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
+            consumedInternal.keySerde());
         assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
-        assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
-
-        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde(), null);
-        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde(), null);
-        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde);
-
-        assertEquals(((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde());
-        assertEquals(((AbstractStream) table1.join(table1, joiner)).valueSerde(), null);
-        assertEquals(((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
-
-        assertEquals(((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde());
-        assertEquals(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde(), null);
-        assertEquals(((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
-
-        assertEquals(((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), consumedInternal.keySerde());
-        assertEquals(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde(), null);
-        assertEquals(((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+        assertEquals(
+            ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(),
+            mySerde);
+        assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(),
+            mySerde);
+
+        assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde());
+        assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde());
+        assertEquals(
+            ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(),
+            mySerde);
+        assertEquals(
+            ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(),
+            mySerde);
+
+        assertEquals(
+            ((AbstractStream) table1.join(table1, joiner)).keySerde(),
+            consumedInternal.keySerde());
+        assertNull(((AbstractStream) table1.join(table1, joiner)).valueSerde());
+        assertEquals(
+            ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
+            mySerde);
+        assertEquals(
+            ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
+            mySerde);
+
+        assertEquals(
+            ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(),
+            consumedInternal.keySerde());
+        assertNull(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde());
+        assertEquals(
+            ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
+            mySerde);
+        assertEquals(
+            ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
+            mySerde);
+
+        assertEquals(
+            ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(),
+            consumedInternal.keySerde());
+        assertNull(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde());
+        assertEquals(
+            ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
+            mySerde);
+        assertEquals(
+            ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
+            mySerde);
     }
 
     @Test
@@ -209,23 +252,12 @@ public class KTableImplTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         builder.table(topic2, consumed);
 
-        final KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
-                new ValueMapper<String, Integer>() {
-                    @Override
-                    public Integer apply(final String value) {
-                        return new Integer(value);
-                    }
-                });
-        table1Mapped.filter(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
+        final KTableImpl<String, String, Integer> table1Mapped =
+            (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new);
+        table1Mapped.filter((key, value) -> (value % 2) == 0);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             assertEquals(0, driver.getAllStateStores().size());
@@ -240,31 +272,15 @@ public class KTableImplTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         final KTableImpl<String, String, String> table2 =
-                (KTableImpl<String, String, String>) builder.table(topic2, consumed);
-
-        final KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
-                new ValueMapper<String, Integer>() {
-                    @Override
-                    public Integer apply(final String value) {
-                        return new Integer(value);
-                    }
-                });
-        final KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
-        table2.join(table1MappedFiltered,
-                new ValueJoiner<String, Integer, String>() {
-                    @Override
-                    public String apply(final String v1, final Integer v2) {
-                        return v1 + v2;
-                    }
-                });
+            (KTableImpl<String, String, String>) builder.table(topic2, consumed);
+
+        final KTableImpl<String, String, Integer> table1Mapped =
+            (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new);
+        final KTableImpl<String, Integer, Integer> table1MappedFiltered =
+            (KTableImpl<String, Integer, Integer>) table1Mapped.filter((key, value) -> (value % 2) == 0);
+        table2.join(table1MappedFiltered, (v1, v2) -> v1 + v2);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             assertEquals(2, driver.getAllStateStores().size());
@@ -280,30 +296,37 @@ public class KTableImplTest {
             }
         }
         throw new AssertionError("No processor named '" + processorName + "'"
-                + "found in the provided Topology:\n" + topology.describe());
+            + "found in the provided Topology:\n" + topology.describe());
     }
 
     @Test
-    public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws NoSuchFieldException, IllegalAccessException {
+    public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws Exception {
         final String topic1 = "topic1";
         final String storeName1 = "storeName1";
 
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(topic1,
-                                                                   consumed,
-                                                                   Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
-                                                                           .withKeySerde(Serdes.String())
-                                                                           .withValueSerde(Serdes.String())
-                );
-
-        table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
-            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("mock-result1"));
-
-
-        table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
-            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("mock-result2"));
+            (KTableImpl<String, String, String>) builder.table(
+                topic1,
+                consumed,
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.String())
+            );
+
+        table1.groupBy(MockMapper.noOpKeyValueMapper())
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                MockAggregator.TOSTRING_REMOVER,
+                Materialized.as("mock-result1"));
+
+        table1.groupBy(MockMapper.noOpKeyValueMapper())
+            .reduce(
+                MockReducer.STRING_ADDER,
+                MockReducer.STRING_REMOVER,
+                Materialized.as("mock-result2"));
 
         final Topology topology = builder.build();
         try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) {
@@ -315,8 +338,12 @@ public class KTableImplTest {
             assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000007");
             assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000008");
 
-            final Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
-            final Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
+            final Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003"))
+                .getClass()
+                .getDeclaredField("valSerializer");
+            final Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004"))
+                .getClass()
+                .getDeclaredField("valDeserializer");
             valSerializerField.setAccessible(true);
             valDeserializerField.setAccessible(true);
 
@@ -394,22 +421,12 @@ public class KTableImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
-        table.filter(new Predicate<String, String>() {
-            @Override
-            public boolean test(final String key, final String value) {
-                return false;
-            }
-        }, (Materialized) null);
+        table.filter((key, value) -> false, (Materialized) null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
-        table.filterNot(new Predicate<String, String>() {
-            @Override
-            public boolean test(final String key, final String value) {
-                return false;
-            }
-        }, (Materialized) null);
+        table.filterNot((key, value) -> false, (Materialized) null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -435,14 +452,16 @@ public class KTableImplTest {
     @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
-        final ValueTransformerWithKeySupplier<String, String, ?> valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class);
+        final ValueTransformerWithKeySupplier<String, String, ?> valueTransformerSupplier =
+            mock(ValueTransformerWithKeySupplier.class);
         table.transformValues(valueTransformerSupplier, (Materialized) null);
     }
 
     @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() {
-        final ValueTransformerWithKeySupplier<String, String, ?> valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class);
+        final ValueTransformerWithKeySupplier<String, String, ?> valueTransformerSupplier =
+            mock(ValueTransformerWithKeySupplier.class);
         table.transformValues(valueTransformerSupplier, (String[]) null);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 00791aa..b163915 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -47,10 +47,13 @@ import static org.junit.Assert.assertTrue;
 public class KTableMapValuesTest {
 
     private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
-    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final ConsumerRecordFactory<String, String> recordFactory =
+        new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
-    private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> supplier) {
+    private void doTestKTable(final StreamsBuilder builder,
+                              final String topic1,
+                              final MockProcessorSupplier<String, Integer> supplier) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(topic1, "A", "1"));
             driver.pipeInput(recordFactory.create(topic1, "B", "2"));
@@ -82,7 +85,11 @@ public class KTableMapValuesTest {
         final String topic1 = "topic1";
 
         final KTable<String, String> table1 = builder.table(topic1, consumed);
-        final KTable<String, Integer> table2 = table1.mapValues(value -> value.charAt(0) - 48, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
+        final KTable<String, Integer> table2 = table1
+            .mapValues(
+                value -> value.charAt(0) - 48,
+                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName")
+                    .withValueSerde(Serdes.Integer()));
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
         table2.toStream().process(supplier);
@@ -167,11 +174,15 @@ public class KTableMapValuesTest {
         final KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         final KTableImpl<String, String, Integer> table2 =
-            (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new,
-                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName2).withValueSerde(Serdes.Integer()));
+            (KTableImpl<String, String, Integer>) table1.mapValues(
+                Integer::new,
+                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName2)
+                    .withValueSerde(Serdes.Integer()));
         final KTableImpl<String, String, Integer> table3 =
-            (KTableImpl<String, String, Integer>) table1.mapValues(value -> new Integer(value) * (-1),
-                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName3).withValueSerde(Serdes.Integer()));
+            (KTableImpl<String, String, Integer>) table1.mapValues(
+                value -> new Integer(value) * (-1),
+                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName3)
+                    .withValueSerde(Serdes.Integer()));
         final KTableImpl<String, String, Integer> table4 =
             (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new);
 
@@ -189,8 +200,9 @@ public class KTableMapValuesTest {
         final String topic1 = "topic1";
 
         final KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new);
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
+        final KTableImpl<String, String, Integer> table2 =
+            (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new);
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
@@ -231,8 +243,9 @@ public class KTableMapValuesTest {
         final String topic1 = "topic1";
 
         final KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new);
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
+        final KTableImpl<String, String, Integer> table2 =
+            (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new);
 
         table2.enableSendingOldValues();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 350b0d2..7da5077 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -62,8 +62,8 @@ import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 @RunWith(EasyMockRunner.class)
@@ -75,8 +75,8 @@ public class KTableTransformValuesTest {
 
     private static final Consumed<String, String> CONSUMED = Consumed.with(Serdes.String(), Serdes.String());
 
-    private final ConsumerRecordFactory<String, String> recordFactory
-        = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final ConsumerRecordFactory<String, String> recordFactory =
+        new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
 
     private TopologyTestDriver driver;
     private MockProcessorSupplier<String, String> capture;
@@ -140,7 +140,8 @@ public class KTableTransformValuesTest {
     @Test
     public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {
         final SingletonNoOpValueTransformer<String, String> transformer = new SingletonNoOpValueTransformer<>();
-        final KTableTransformValues<String, String, String> transformValues = new KTableTransformValues<>(parent, transformer, null);
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, transformer, null);
         final Processor<String, Change<String>> processor = transformValues.get();
 
         processor.init(context);
@@ -329,7 +330,7 @@ public class KTableTransformValuesTest {
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", (String) null, 0L));
 
         assertThat(output(), hasItems("A:A->a!", "B:B->b!", "D:D->null!"));
-        assertThat("Store should not be materialized", driver.getKeyValueStore(QUERYABLE_NAME), is(nullValue()));
+        assertNull("Store should not be materialized", driver.getKeyValueStore(QUERYABLE_NAME));
     }
 
     @Test
@@ -412,21 +413,11 @@ public class KTableTransformValuesTest {
     }
 
     private static KeyValueMapper<String, Integer, KeyValue<String, Integer>> toForceSendingOfOldValues() {
-        return new KeyValueMapper<String, Integer, KeyValue<String, Integer>>() {
-            @Override
-            public KeyValue<String, Integer> apply(final String key, final Integer value) {
-                return new KeyValue<>(key, value);
-            }
-        };
+        return KeyValue::new;
     }
 
     private static ValueMapper<Integer, String> mapBackToStrings() {
-        return new ValueMapper<Integer, String>() {
-            @Override
-            public String apply(final Integer value) {
-                return value.toString();
-            }
-        };
+        return Object::toString;
     }
 
     private static StoreBuilder<KeyValueStore<Long, Long>> storeBuilder(final String storeName) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index ce0f25e..e0707ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -55,28 +54,26 @@ public class TimeWindowedKStreamImplTest {
 
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
-    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final ConsumerRecordFactory<String, String> recordFactory =
+        new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private TimeWindowedKStream<String, String> windowedStream;
 
     @Before
     public void before() {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
-        windowedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-                .windowedBy(TimeWindows.of(ofMillis(500L)));
+        windowedStream = stream.
+            groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(TimeWindows.of(ofMillis(500L)));
     }
 
     @Test
     public void shouldCountWindowed() {
         final Map<Windowed<String>, Long> results = new HashMap<>();
-        windowedStream.count()
-                .toStream()
-                .foreach(new ForeachAction<Windowed<String>, Long>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final Long value) {
-                        results.put(key, value);
-                    }
-                });
+        windowedStream
+            .count()
+            .toStream()
+            .foreach(results::put);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
             processData(driver);
@@ -90,14 +87,10 @@ public class TimeWindowedKStreamImplTest {
     @Test
     public void shouldReduceWindowed() {
         final Map<Windowed<String>, String> results = new HashMap<>();
-        windowedStream.reduce(MockReducer.STRING_ADDER)
-                .toStream()
-                .foreach(new ForeachAction<Windowed<String>, String>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final String value) {
-                        results.put(key, value);
-                    }
-                });
+        windowedStream
+            .reduce(MockReducer.STRING_ADDER)
+            .toStream()
+            .foreach(results::put);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
             processData(driver);
@@ -110,17 +103,14 @@ public class TimeWindowedKStreamImplTest {
     @Test
     public void shouldAggregateWindowed() {
         final Map<Windowed<String>, String> results = new HashMap<>();
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
+        windowedStream
+            .aggregate(
+                MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>with(Serdes.String(), Serdes.String()
-        ))
-                .toStream()
-                .foreach(new ForeachAction<Windowed<String>, String>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final String value) {
-                        results.put(key, value);
-                    }
-                });
+                Materialized.with(Serdes.String(), Serdes.String()))
+            .toStream()
+            .foreach(results::put);
+
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
             processData(driver);
         }
@@ -131,14 +121,16 @@ public class TimeWindowedKStreamImplTest {
 
     @Test
     public void shouldMaterializeCount() {
-        windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
-                                     .withKeySerde(Serdes.String())
-                                     .withValueSerde(Serdes.Long()));
+        windowedStream.count(
+            Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.Long()));
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
             processData(driver);
             final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store");
-            final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+            final List<KeyValue<Windowed<String>, Long>> data =
+                StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
 
             assertThat(data, equalTo(Arrays.asList(
                     KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
@@ -149,15 +141,17 @@ public class TimeWindowedKStreamImplTest {
 
     @Test
     public void shouldMaterializeReduced() {
-        windowedStream.reduce(MockReducer.STRING_ADDER,
-                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
-                                      .withKeySerde(Serdes.String())
-                                      .withValueSerde(Serdes.String()));
+        windowedStream.reduce(
+            MockReducer.STRING_ADDER,
+            Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String()));
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
             processData(driver);
             final WindowStore<String, String> windowStore = driver.getWindowStore("reduced");
-            final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+            final List<KeyValue<Windowed<String>, String>> data =
+                StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
 
             assertThat(data, equalTo(Arrays.asList(
                     KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
@@ -168,16 +162,19 @@ public class TimeWindowedKStreamImplTest {
 
     @Test
     public void shouldMaterializeAggregated() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 MockAggregator.TOSTRING_ADDER,
-                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
-                                         .withKeySerde(Serdes.String())
-                                         .withValueSerde(Serdes.String()));
+        windowedStream.aggregate(
+            MockInitializer.STRING_INIT,
+            MockAggregator.TOSTRING_ADDER,
+            Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String()));
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
             processData(driver);
             final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
-            final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+            final List<KeyValue<Windowed<String>, String>> data =
+                StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+
             assertThat(data, equalTo(Arrays.asList(
                     KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
                     KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
@@ -202,36 +199,41 @@ public class TimeWindowedKStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
-        windowedStream.aggregate(null,
-                                 MockAggregator.TOSTRING_ADDER,
-                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+        windowedStream.aggregate(
+            null,
+            MockAggregator.TOSTRING_ADDER,
+            Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 null,
-                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+        windowedStream.aggregate(
+            MockInitializer.STRING_INIT,
+            null,
+            Materialized.as("store"));
     }
 
     @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 MockAggregator.TOSTRING_ADDER,
-                                 (Materialized) null);
+        windowedStream.aggregate(
+            MockInitializer.STRING_INIT,
+            MockAggregator.TOSTRING_ADDER,
+            (Materialized) null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
-        windowedStream.reduce(null,
-                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+        windowedStream.reduce(
+            null,
+            Materialized.as("store"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
-        windowedStream.reduce(MockReducer.STRING_ADDER,
-                              null);
+        windowedStream.reduce(
+            MockReducer.STRING_ADDER,
+            null);
     }
 
     @Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 4fe3c07..be64523 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -431,8 +431,8 @@ public class SimpleBenchmark {
         setStreamProperties("simple-benchmark-streams-with-store");
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
-                = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), INTEGER_SERDE, BYTE_SERDE);
+        final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder =
+            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), INTEGER_SERDE, BYTE_SERDE);
         builder.addStateStore(storeBuilder.withCachingEnabled());
 
         final KStream<Integer, byte[]> source = builder.stream(topic);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index b3afa24..a84216e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -63,8 +63,8 @@ public class AbstractTaskTest {
     private final TopicPartition storeTopicPartition2 = new TopicPartition("t2", 0);
     private final TopicPartition storeTopicPartition3 = new TopicPartition("t3", 0);
     private final TopicPartition storeTopicPartition4 = new TopicPartition("t4", 0);
-    private final Collection<TopicPartition> storeTopicPartitions
-        = Utils.mkSet(storeTopicPartition1, storeTopicPartition2, storeTopicPartition3, storeTopicPartition4);
+    private final Collection<TopicPartition> storeTopicPartitions =
+        Utils.mkSet(storeTopicPartition1, storeTopicPartition2, storeTopicPartition3, storeTopicPartition4);
 
     @Before
     public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 183f9ae..475b5e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -187,8 +187,8 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testRegisterNonPersistentStore() throws IOException {
-        final MockKeyValueStore nonPersistentStore
-            = new MockKeyValueStore(nonPersistentStoreName, false); // non persistent store
+        final MockKeyValueStore nonPersistentStore =
+            new MockKeyValueStore(nonPersistentStoreName, false); // non persistent store
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             new TaskId(0, 2),
             noPartitions,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
index 00b440f..3ecf6ef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -107,10 +107,11 @@ public class CompositeReadOnlySessionStoreTest {
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowInvalidStateStoreExceptionOnRebalance() {
-        final CompositeReadOnlySessionStore<String, String> store
-                = new CompositeReadOnlySessionStore<>(new StateStoreProviderStub(true),
-                                                      QueryableStoreTypes.<String, String>sessionStore(),
-                                                      "whateva");
+        final CompositeReadOnlySessionStore<String, String> store =
+            new CompositeReadOnlySessionStore<>(
+                new StateStoreProviderStub(true),
+                QueryableStoreTypes.<String, String>sessionStore(),
+                "whateva");
 
         store.fetch("a");
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index a80e28b..2a35342 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -82,8 +82,8 @@ public class RocksDBSessionStoreTest {
         sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 1L);
         sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 2L);
 
-        final List<KeyValue<Windowed<String>, Long>> expected
-                = Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
+        final List<KeyValue<Windowed<String>, Long>> expected =
+            Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
 
         final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(key, 0, 1000L);
         assertEquals(expected, toList(values));
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index a90afe7..432b9f8 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -315,8 +315,8 @@ public class TopologyTestDriver implements Closeable {
                 stateRestoreListener,
                 streamsConfig);
 
-            final GlobalProcessorContextImpl globalProcessorContext
-                = new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache);
+            final GlobalProcessorContextImpl globalProcessorContext =
+                new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache);
             globalStateManager.setGlobalProcessorContext(globalProcessorContext);
 
             globalStateTask = new GlobalStateUpdateTask(


Mime
View raw message