kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: improve Streams checkstyle and code cleanup (#5954)
Date Tue, 11 Dec 2018 09:54:51 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 046b008  MINOR: improve Streams checkstyle and code cleanup (#5954)
046b008 is described below

commit 046b0087bd76637bbfd813ccef31693fa358ff2d
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Tue Dec 11 01:54:41 2018 -0800

    MINOR: improve Streams checkstyle and code cleanup (#5954)
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, Nikolay Izhikov <nIzhikov@gmail.com>, Ismael Juma <ismael@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 .../examples/temperature/TemperatureDemo.java      |   7 +-
 .../org/apache/kafka/streams/KafkaStreams.java     |  98 +++++------
 .../java/org/apache/kafka/streams/KeyValue.java    |   3 +-
 .../apache/kafka/streams/internals/ApiUtils.java   |   6 +-
 .../org/apache/kafka/streams/kstream/Consumed.java |   4 +-
 .../apache/kafka/streams/kstream/JoinWindows.java  |  69 ++++----
 .../org/apache/kafka/streams/kstream/KStream.java  |  49 +++---
 .../org/apache/kafka/streams/kstream/KTable.java   |  12 +-
 .../apache/kafka/streams/kstream/Materialized.java |   5 +-
 .../apache/kafka/streams/kstream/Serialized.java   |   2 +-
 .../kafka/streams/kstream/SessionWindows.java      |  36 +++--
 .../kafka/streams/kstream/TimeWindowedKStream.java |   6 +-
 .../streams/kstream/TimeWindowedSerializer.java    |   6 +-
 .../apache/kafka/streams/kstream/TimeWindows.java  |  47 +++---
 .../kafka/streams/kstream/UnlimitedWindows.java    |  15 +-
 .../org/apache/kafka/streams/kstream/Window.java   |   7 +-
 .../org/apache/kafka/streams/kstream/Windowed.java |   8 +-
 .../kstream/internals/CacheFlushListener.java      |   4 +-
 .../kafka/streams/kstream/internals/Change.java    |   8 +-
 .../kstream/internals/ChangedSerializer.java       |   8 +-
 .../kstream/internals/KStreamTransform.java        |   7 +-
 .../streams/kstream/internals/KTableFilter.java    |   6 +-
 .../streams/kstream/internals/KTableMapValues.java |   3 +-
 .../streams/kstream/internals/PrintedInternal.java |   6 -
 .../streams/kstream/internals/SessionWindow.java   |   2 +-
 .../streams/kstream/internals/TimeWindow.java      |   2 +-
 .../streams/kstream/internals/UnlimitedWindow.java |   2 +-
 .../internals/suppress/EagerBufferConfigImpl.java  |   8 +-
 .../suppress/FinalResultsSuppressionBuilder.java   |   8 +-
 .../internals/suppress/StrictBufferConfigImpl.java |   8 +-
 .../internals/suppress/SuppressedInternal.java     |   8 +-
 .../apache/kafka/streams/processor/Processor.java  |   9 +-
 .../org/apache/kafka/streams/processor/TaskId.java |   7 +-
 .../processor/internals/ProcessorContextImpl.java  |   4 +-
 .../internals/ProcessorRecordContext.java          |   8 +-
 .../streams/processor/internals/QuickUnion.java    |   8 +-
 .../internals/RepartitionTopicConfig.java          |  10 +-
 .../streams/processor/internals/SourceNode.java    |   9 +-
 .../kafka/streams/processor/internals/Stamped.java |  17 +-
 .../internals/StreamsPartitionAssignor.java        |  16 +-
 .../internals/UnwindowedChangelogTopicConfig.java  |  10 +-
 .../internals/WindowedChangelogTopicConfig.java    |  10 +-
 .../internals/assignment/ClientState.java          |  11 +-
 .../internals/assignment/StickyTaskAssignor.java   |   8 +-
 .../internals/metrics/StreamsMetricsImpl.java      |   3 +-
 .../streams/state/KeyValueBytesStoreSupplier.java  |   4 +-
 .../kafka/streams/state/QueryableStoreType.java    |   2 +-
 .../kafka/streams/state/QueryableStoreTypes.java   |   7 +-
 .../kafka/streams/state/ReadOnlyWindowStore.java   |  13 +-
 .../org/apache/kafka/streams/state/Stores.java     |  11 +-
 .../kafka/streams/state/StreamsMetadata.java       |  23 ++-
 .../streams/state/WindowBytesStoreSupplier.java    |   4 +-
 .../apache/kafka/streams/state/WindowStore.java    |  24 +--
 .../internals/CompositeReadOnlyWindowStore.java    |  50 +++---
 .../streams/state/internals/ContextualRecord.java  |  10 +-
 .../InMemoryTimeOrderedKeyValueBuffer.java         |   8 +-
 .../streams/state/internals/LRUCacheEntry.java     |   8 +-
 .../streams/state/internals/MemoryLRUCache.java    |  27 ++--
 .../streams/state/internals/RocksDBStore.java      |   7 +-
 .../kafka/streams/state/internals/Segment.java     |   7 +-
 .../kafka/streams/state/internals/Segments.java    |   3 +-
 .../integration/AbstractJoinIntegrationTest.java   |  39 ++---
 .../integration/RegexSourceIntegrationTest.java    |  77 +++------
 .../integration/RestoreIntegrationTest.java        |  81 +++-------
 .../streams/integration/utils/KafkaEmbedded.java   |  17 +-
 .../kstream/internals/AbstractStreamTest.java      |   7 +-
 .../apache/kafka/streams/perf/YahooBenchmark.java  | 116 ++++++-------
 .../processor/internals/ProcessorTopologyTest.java |   6 +-
 .../internals/StreamsPartitionAssignorTest.java    | 180 ++++++++-------------
 .../CompositeReadOnlyWindowStoreTest.java          |   3 +-
 .../state/internals/ReadOnlyWindowStoreStub.java   |  63 ++++----
 .../kafka/streams/tests/SmokeTestDriver.java       |  23 ++-
 .../streams/processor/MockProcessorContext.java    |  21 ++-
 .../kafka/streams/MockProcessorContextTest.java    |   9 +-
 74 files changed, 697 insertions(+), 743 deletions(-)

diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 93480e4..4994a5d 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.examples.temperature;
 
-import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -29,6 +28,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;
 
+import java.time.Duration;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
@@ -90,10 +90,11 @@ public class TemperatureDemo {
             .groupByKey()
             .windowedBy(TimeWindows.of(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE)))
             .reduce((value1, value2) -> {
-                if (Integer.parseInt(value1) > Integer.parseInt(value2))
+                if (Integer.parseInt(value1) > Integer.parseInt(value2)) {
                     return value1;
-                else
+                } else {
                     return value2;
+                }
             })
             .toStream()
             .filter((key, value) -> Integer.parseInt(value) > TEMPERATURE_THRESHOLD);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 420c51f..bbd0105 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams;
 
-import java.time.Duration;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -63,6 +62,7 @@ import org.apache.kafka.streams.state.internals.StateStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 import org.slf4j.Logger;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -77,7 +77,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
@@ -127,7 +126,6 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
 public class KafkaStreams implements AutoCloseable {
 
     private static final String JMX_PREFIX = "kafka.streams";
-    private static final int DEFAULT_CLOSE_TIMEOUT = 0;
 
     // processId is expected to be unique across JVMs and to be used
     // in userData of the subscription request to allow assignor be aware
@@ -135,7 +133,6 @@ public class KafkaStreams implements AutoCloseable {
     // usage only and should not be exposed to users at all.
     private final Time time;
     private final Logger log;
-    private final UUID processId;
     private final String clientId;
     private final Metrics metrics;
     private final StreamsConfig config;
@@ -386,7 +383,9 @@ public class KafkaStreams implements AutoCloseable {
             result.putAll(thread.consumerMetrics());
             result.putAll(thread.adminClientMetrics());
         }
-        if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
+        if (globalStreamThread != null) {
+            result.putAll(globalStreamThread.consumerMetrics());
+        }
         result.putAll(metrics.metrics());
         return Collections.unmodifiableMap(result);
     }
@@ -633,7 +632,7 @@ public class KafkaStreams implements AutoCloseable {
         this.time = time;
 
         // The application ID is a required config and hence should always have value
-        processId = UUID.randomUUID();
+        final UUID processId = UUID.randomUUID();
         final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         if (userClientId.length() <= 0) {
@@ -733,13 +732,10 @@ public class KafkaStreams implements AutoCloseable {
         final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
-        stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-            @Override
-            public Thread newThread(final Runnable r) {
-                final Thread thread = new Thread(r, clientId + "-CleanupThread");
-                thread.setDaemon(true);
-                return thread;
-            }
+        stateDirCleaner = Executors.newSingleThreadScheduledExecutor(r -> {
+            final Thread thread = new Thread(r, clientId + "-CleanupThread");
+            thread.setDaemon(true);
+            return thread;
         });
     }
 
@@ -791,12 +787,9 @@ public class KafkaStreams implements AutoCloseable {
             }
 
             final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
-            stateDirCleaner.scheduleAtFixedRate(new Runnable() {
-                @Override
-                public void run() {
-                    if (state == State.RUNNING) {
-                        stateDirectory.cleanRemovedTasks(cleanupDelay);
-                    }
+            stateDirCleaner.scheduleAtFixedRate(() -> {
+                if (state == State.RUNNING) {
+                    stateDirectory.cleanRemovedTasks(cleanupDelay);
                 }
             }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
 
@@ -814,7 +807,7 @@ public class KafkaStreams implements AutoCloseable {
      * This will block until all threads have stopped.
      */
     public void close() {
-        close(DEFAULT_CLOSE_TIMEOUT, TimeUnit.SECONDS);
+        close(Long.MAX_VALUE);
     }
 
     /**
@@ -856,45 +849,42 @@ public class KafkaStreams implements AutoCloseable {
             // wait for all threads to join in a separate thread;
             // save the current thread so that if it is a stream thread
             // we don't attempt to join it and cause a deadlock
-            final Thread shutdownThread = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    // notify all the threads to stop; avoid deadlocks by stopping any
-                    // further state reports from the thread since we're shutting down
-                    for (final StreamThread thread : threads) {
-                        thread.setStateListener(null);
-                        thread.shutdown();
-                    }
+            final Thread shutdownThread = new Thread(() -> {
+                // notify all the threads to stop; avoid deadlocks by stopping any
+                // further state reports from the thread since we're shutting down
+                for (final StreamThread thread : threads) {
+                    thread.setStateListener(null);
+                    thread.shutdown();
+                }
 
-                    for (final StreamThread thread : threads) {
-                        try {
-                            if (!thread.isRunning()) {
-                                thread.join();
-                            }
-                        } catch (final InterruptedException ex) {
-                            Thread.currentThread().interrupt();
+                for (final StreamThread thread : threads) {
+                    try {
+                        if (!thread.isRunning()) {
+                            thread.join();
                         }
+                    } catch (final InterruptedException ex) {
+                        Thread.currentThread().interrupt();
                     }
+                }
 
-                    if (globalStreamThread != null) {
-                        globalStreamThread.setStateListener(null);
-                        globalStreamThread.shutdown();
-                    }
+                if (globalStreamThread != null) {
+                    globalStreamThread.setStateListener(null);
+                    globalStreamThread.shutdown();
+                }
 
-                    if (globalStreamThread != null && !globalStreamThread.stillRunning()) {
-                        try {
-                            globalStreamThread.join();
-                        } catch (final InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                        }
-                        globalStreamThread = null;
+                if (globalStreamThread != null && !globalStreamThread.stillRunning()) {
+                    try {
+                        globalStreamThread.join();
+                    } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
                     }
+                    globalStreamThread = null;
+                }
 
-                    adminClient.close();
+                adminClient.close();
 
-                    metrics.close();
-                    setState(State.NOT_RUNNING);
-                }
+                metrics.close();
+                setState(State.NOT_RUNNING);
             }, "kafka-streams-close-thread");
 
             shutdownThread.setDaemon(true);
@@ -918,14 +908,12 @@ public class KafkaStreams implements AutoCloseable {
      * @param timeout  how long to wait for the threads to shutdown
      * @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
      * before all threads stopped
-     * Note that this method must not be called in the {@link StateListener#onChange(State, State)} callback of {@link StateListener}.
+     * Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
      * @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
      */
     public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout");
-        ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
-
-        final long timeoutMs = timeout.toMillis();
+        final long timeoutMs = ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
         if (timeoutMs < 0) {
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index 425e272..604cf7f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -63,8 +63,9 @@ public class KeyValue<K, V> {
 
     @Override
     public boolean equals(final Object obj) {
-        if (this == obj)
+        if (this == obj) {
             return true;
+        }
 
         if (!(obj instanceof KeyValue)) {
             return false;
diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
index dd3b691..86c9fc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
@@ -36,8 +36,9 @@ public final class ApiUtils {
      */
     public static long validateMillisecondDuration(final Duration duration, final String messagePrefix) {
         try {
-            if (duration == null)
+            if (duration == null) {
                 throw new IllegalArgumentException(messagePrefix + "It shouldn't be null.");
+            }
 
             return duration.toMillis();
         } catch (final ArithmeticException e) {
@@ -53,8 +54,9 @@ public final class ApiUtils {
      */
     public static long validateMillisecondInstant(final Instant instant, final String messagePrefix) {
         try {
-            if (instant == null)
+            if (instant == null) {
                 throw new IllegalArgumentException(messagePrefix + "It shouldn't be null.");
+            }
 
             return instant.toEpochMilli();
         } catch (final ArithmeticException e) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
index 0af7dbe..667b621 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
@@ -121,7 +121,7 @@ public class Consumed<K, V> {
     }
 
     /**
-     * Create an instance of {@link Consumed} with a {@link Topology.AutoOffsetReset}.
+     * Create an instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
      *
      * @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
      * @param <K>         key type
@@ -166,7 +166,7 @@ public class Consumed<K, V> {
     }
 
     /**
-     * Configure the instance of {@link Consumed} with a {@link Topology.AutoOffsetReset}.
+     * Configure the instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
      *
      * @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
      * @return this
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 8256890..219489f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -77,22 +77,25 @@ public final class JoinWindows extends Windows<Window> {
     /** Maximum time difference for tuples that are after the join tuple. */
     public final long afterMs;
 
-    private final Duration grace;
+    private final long graceMs;
 
-    private JoinWindows(final long beforeMs, final long afterMs, final Duration grace, final long maintainDurationMs) {
+    private JoinWindows(final long beforeMs,
+                        final long afterMs,
+                        final long graceMs,
+                        final long maintainDurationMs) {
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
-        this.grace = grace;
+        this.graceMs = graceMs;
         this.maintainDurationMs = maintainDurationMs;
     }
 
-    @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     private JoinWindows(final long beforeMs,
                         final long afterMs,
-                        final Duration grace,
+                        final long graceMs,
                         final long maintainDurationMs,
                         final int segments) {
         super(segments);
@@ -101,7 +104,7 @@ public final class JoinWindows extends Windows<Window> {
         }
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
-        this.grace = grace;
+        this.graceMs = graceMs;
         this.maintainDurationMs = maintainDurationMs;
     }
 
@@ -117,7 +120,7 @@ public final class JoinWindows extends Windows<Window> {
     @Deprecated
     public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException {
         // This is a static factory method, so we initialize grace and retention to the defaults.
-        return new JoinWindows(timeDifferenceMs, timeDifferenceMs, null, DEFAULT_RETENTION_MS);
+        return new JoinWindows(timeDifferenceMs, timeDifferenceMs, -1L, DEFAULT_RETENTION_MS);
     }
 
     /**
@@ -128,10 +131,10 @@ public final class JoinWindows extends Windows<Window> {
      * @param timeDifference join window interval
      * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
      */
+    @SuppressWarnings("deprecation")
     public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
-        ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
-        return of(timeDifference.toMillis());
+        return of(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
     }
 
     /**
@@ -145,10 +148,10 @@ public final class JoinWindows extends Windows<Window> {
      * @throws IllegalArgumentException if the resulting window size is negative
      * @deprecated Use {@link #before(Duration)} instead.
      */
-    @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Deprecated
     public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
-        return new JoinWindows(timeDifferenceMs, afterMs, grace, maintainDurationMs, segments);
+        return new JoinWindows(timeDifferenceMs, afterMs, graceMs, maintainDurationMs, segments);
     }
 
     /**
@@ -161,11 +164,10 @@ public final class JoinWindows extends Windows<Window> {
      * @param timeDifference relative window start time
      * @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
      */
-    @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
-        ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
-        return before(timeDifference.toMillis());
+        return before(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
     }
 
     /**
@@ -179,10 +181,10 @@ public final class JoinWindows extends Windows<Window> {
      * @throws IllegalArgumentException if the resulting window size is negative
      * @deprecated Use {@link #after(Duration)} instead
      */
-    @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Deprecated
     public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
-        return new JoinWindows(beforeMs, timeDifferenceMs, grace, maintainDurationMs, segments);
+        return new JoinWindows(beforeMs, timeDifferenceMs, graceMs, maintainDurationMs, segments);
     }
 
     /**
@@ -195,11 +197,10 @@ public final class JoinWindows extends Windows<Window> {
      * @param timeDifference relative window end time
      * @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
      */
-    @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
-        ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
-        return after(timeDifference.toMillis());
+        return after(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
     }
 
     /**
@@ -228,14 +229,14 @@ public final class JoinWindows extends Windows<Window> {
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
      */
-    @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
-        ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
-        if (afterWindowEnd.toMillis() < 0) {
+        final long afterWindowEndMs = ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+        if (afterWindowEndMs < 0) {
             throw new IllegalArgumentException("Grace period must not be negative.");
         }
-        return new JoinWindows(beforeMs, afterMs, afterWindowEnd, maintainDurationMs, segments);
+        return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, maintainDurationMs, segments);
     }
 
     @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
@@ -244,7 +245,7 @@ public final class JoinWindows extends Windows<Window> {
         // NOTE: in the future, when we remove maintainMs,
         // we should default the grace period to 24h to maintain the default behavior,
         // or we can default to (24h - size) if you want to be super accurate.
-        return grace != null ? grace.toMillis() : maintainMs() - size();
+        return graceMs != -1 ? graceMs : maintainMs() - size();
     }
 
     /**
@@ -260,7 +261,7 @@ public final class JoinWindows extends Windows<Window> {
         if (durationMs < size()) {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
         }
-        return new JoinWindows(beforeMs, afterMs, grace, durationMs, segments);
+        return new JoinWindows(beforeMs, afterMs, graceMs, durationMs, segments);
     }
 
     /**
@@ -271,7 +272,7 @@ public final class JoinWindows extends Windows<Window> {
      * @return the window maintain duration
      * @deprecated since 2.1. This function should not be used anymore as retention period can be specified via {@link Materialized#withRetention(Duration)}.
      */
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings({"deprecation", "deprecatedMemberStillInUse"})
     @Override
     @Deprecated
     public long maintainMs() {
@@ -281,29 +282,33 @@ public final class JoinWindows extends Windows<Window> {
     @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final JoinWindows that = (JoinWindows) o;
         return beforeMs == that.beforeMs &&
             afterMs == that.afterMs &&
             maintainDurationMs == that.maintainDurationMs &&
             segments == that.segments &&
-            Objects.equals(grace, that.grace);
+            graceMs == that.graceMs;
     }
 
     @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this
     @Override
     public int hashCode() {
-        return Objects.hash(beforeMs, afterMs, grace, maintainDurationMs, segments);
+        return Objects.hash(beforeMs, afterMs, graceMs, maintainDurationMs, segments);
     }
 
-    @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Override
     public String toString() {
         return "JoinWindows{" +
             "beforeMs=" + beforeMs +
             ", afterMs=" + afterMs +
-            ", grace=" + grace +
+            ", graceMs=" + graceMs +
             ", maintainDurationMs=" + maintainDurationMs +
             ", segments=" + segments +
             '}';
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 6d83340..b961709 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -387,6 +387,7 @@ public interface KStream<K, V> {
      *
      * @param action an action to perform on each record
      * @see #process(ProcessorSupplier, String...)
+     * @return itself
      */
     KStream<K, V> peek(final ForeachAction<? super K, ? super V> action);
 
@@ -772,8 +773,8 @@ public interface KStream<K, V> {
      * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
      * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later
      * operator depends on the newly selected key.
-     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
+     * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -802,8 +803,8 @@ public interface KStream<K, V> {
      * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
      * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka
      * if a later operator depends on the newly selected key.
-     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
+     * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -863,8 +864,8 @@ public interface KStream<K, V> {
      * <p>
      * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
      * later operator depends on the newly selected key.
-     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
+     * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -892,8 +893,8 @@ public interface KStream<K, V> {
      * <p>
      * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
      * later operator depends on the newly selected key.
-     * This topic will be as "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
+     * This topic will be as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -924,8 +925,8 @@ public interface KStream<K, V> {
      * <p>
      * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
      * operator depends on the newly selected key.
-     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
+     * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
      * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -990,9 +991,9 @@ public interface KStream<K, V> {
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link  StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      *
      * <p>
@@ -1068,9 +1069,9 @@ public interface KStream<K, V> {
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link  StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1151,9 +1152,9 @@ public interface KStream<K, V> {
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1301,7 +1302,7 @@ public interface KStream<K, V> {
      * <tr>
      * <td>&lt;K2:B&gt;</td>
      * <td>&lt;K2:b&gt;</td>
-     * <td>&lt;K2:ValueJoiner(null,b)&gt;<br />&lt;K2:ValueJoiner(B,b)&gt;</td>
+     * <td>&lt;K2:ValueJoiner(null,b)&gt;<br></br>&lt;K2:ValueJoiner(B,b)&gt;</td>
      * </tr>
      * <tr>
      * <td></td>
@@ -1383,7 +1384,7 @@ public interface KStream<K, V> {
      * <tr>
      * <td>&lt;K2:B&gt;</td>
      * <td>&lt;K2:b&gt;</td>
-     * <td>&lt;K2:ValueJoiner(null,b)&gt;<br />&lt;K2:ValueJoiner(B,b)&gt;</td>
+     * <td>&lt;K2:ValueJoiner(null,b)&gt;<br></br>&lt;K2:ValueJoiner(B,b)&gt;</td>
      * </tr>
      * <tr>
      * <td></td>
@@ -1419,6 +1420,8 @@ public interface KStream<K, V> {
      * @param otherStream the {@code KStream} to be joined with this stream
      * @param joiner      a {@link ValueJoiner} that computes the join result for a pair of matching records
      * @param windows     the specification of the {@link JoinWindows}
+     * @param joined      a {@link Joined} instance that defines the serdes to
+     *                    be used to serialize/deserialize inputs and outputs of the joined streams
      * @param <VO>        the value type of the other stream
      * @param <VR>        the value type of the result stream
      * @return a {@code KStream} that contains join-records for each key and values computed by the given
@@ -1733,10 +1736,12 @@ public interface KStream<K, V> {
      * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
      * correctly on its key.
      *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
+     * @param table   the {@link KTable} to be joined with this stream
+     * @param joiner  a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param joined  a {@link Joined} instance that defines the serdes to
+     *                be used to serialize/deserialize inputs and outputs of the joined streams
+     * @param <VT>    the value type of the table
+     * @param <VR>    the value type of the result stream
      * @return a {@code KStream} that contains join-records for each key and values computed by the given
      * {@link ValueJoiner}, one output for each input {@code KStream} record
      * @see #join(KTable, ValueJoiner, Joined)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 3223b75..36cd17b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -562,8 +562,8 @@ public interface KTable<K, V> {
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
      * <p>
      * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
+     * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -594,8 +594,8 @@ public interface KTable<K, V> {
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
      * <p>
      * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
+     * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -628,8 +628,8 @@ public interface KTable<K, V> {
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
      * <p>
      * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},  "&lt;name&gt" is
+     * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},  "&lt;name&gt;" is
      * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name.
      *
      * <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index a0d6e34..15bdf92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -245,14 +245,15 @@ public class Materialized<K, V, S extends StateStore> {
      * Note that the retention period must be at least long enough to contain the windowed data's entire life cycle,
      * from window-start through window-end, and for the entire grace period.
      *
+     * @param retention the retention time
      * @return itself
      * @throws IllegalArgumentException if retention is negative or can't be represented as {@code long milliseconds}
      */
     public Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(retention, "retention");
-        ApiUtils.validateMillisecondDuration(retention, msgPrefix);
+        final long retenationMs = ApiUtils.validateMillisecondDuration(retention, msgPrefix);
 
-        if (retention.toMillis() < 0) {
+        if (retenationMs < 0) {
             throw new IllegalArgumentException("Retention must not be negative.");
         }
         this.retention = retention;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
index df9c9a1..a34fe9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.serialization.Serde;
  * @param <K> the key type
  * @param <V> the value type
  *
- *  @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.Grouped)} instead
+ *  @deprecated since 2.1. Use {@link  org.apache.kafka.streams.kstream.Grouped} instead
  */
 @Deprecated
 public class Serialized<K, V> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index bdecd8c..9c77fa5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -74,13 +74,13 @@ public final class SessionWindows {
 
     private final long gapMs;
     private final long maintainDurationMs;
-    private final Duration grace;
+    private final long graceMs;
 
 
-    private SessionWindows(final long gapMs, final long maintainDurationMs, final Duration grace) {
+    private SessionWindows(final long gapMs, final long maintainDurationMs, final long graceMs) {
         this.gapMs = gapMs;
         this.maintainDurationMs = maintainDurationMs;
-        this.grace = grace;
+        this.graceMs = graceMs;
     }
 
     /**
@@ -97,7 +97,7 @@ public final class SessionWindows {
         if (inactivityGapMs <= 0) {
             throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
         }
-        return new SessionWindows(inactivityGapMs, DEFAULT_RETENTION_MS, null);
+        return new SessionWindows(inactivityGapMs, DEFAULT_RETENTION_MS, -1);
     }
 
     /**
@@ -108,10 +108,10 @@ public final class SessionWindows {
      *
      * @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
      */
+    @SuppressWarnings("deprecation")
     public static SessionWindows with(final Duration inactivityGap) {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
-        ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix);
-        return with(inactivityGap.toMillis());
+        return with(ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix));
     }
 
     /**
@@ -131,7 +131,7 @@ public final class SessionWindows {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
         }
 
-        return new SessionWindows(gapMs, durationMs, grace);
+        return new SessionWindows(gapMs, durationMs, graceMs);
     }
 
     /**
@@ -148,16 +148,16 @@ public final class SessionWindows {
      */
     public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
-        ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+        final long afterWindowEndMs = ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
 
-        if (afterWindowEnd.toMillis() < 0) {
+        if (afterWindowEndMs < 0) {
             throw new IllegalArgumentException("Grace period must not be negative.");
         }
 
         return new SessionWindows(
             gapMs,
             maintainDurationMs,
-            afterWindowEnd
+            afterWindowEndMs
         );
     }
 
@@ -167,7 +167,7 @@ public final class SessionWindows {
         // NOTE: in the future, when we remove maintainMs,
         // we should default the grace period to 24h to maintain the default behavior,
         // or we can default to (24h - gapMs) if you want to be super accurate.
-        return grace != null ? grace.toMillis() : maintainMs() - inactivityGap();
+        return graceMs != -1 ? graceMs : maintainMs() - inactivityGap();
     }
 
     /**
@@ -195,17 +195,21 @@ public final class SessionWindows {
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final SessionWindows that = (SessionWindows) o;
         return gapMs == that.gapMs &&
             maintainDurationMs == that.maintainDurationMs &&
-            Objects.equals(grace, that.grace);
+            graceMs == that.graceMs;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(gapMs, maintainDurationMs, grace);
+        return Objects.hash(gapMs, maintainDurationMs, graceMs);
     }
 
     @Override
@@ -213,7 +217,7 @@ public final class SessionWindows {
         return "SessionWindows{" +
             "gapMs=" + gapMs +
             ", maintainDurationMs=" + maintainDurationMs +
-            ", grace=" + grace +
+            ", graceMs=" + graceMs +
             '}';
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index d6f4082..d209b6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -41,7 +41,7 @@ import java.time.Duration;
  * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
  * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
  *
- * New events are added to windows until their grace period ends (see {@link Windows#grace(Duration)}).
+ * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}).
  *
  * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
  * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -188,7 +188,6 @@ public interface TimeWindowedKStream<K, V> {
      * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
      * count (c.f. {@link #count()}).
      * <p>
-     * <p>
      * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
      * the same window and key if caching is enabled on the {@link Materialized} instance.
      * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
@@ -312,7 +311,8 @@ public interface TimeWindowedKStream<K, V> {
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
-     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @param reducer       a {@link Reducer} that computes a new aggregate result
+     * @param materialized  an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
index 6b75419..72cdcb1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
@@ -37,6 +37,7 @@ public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {
     private Serializer<T> inner;
 
     // Default constructor needed by Kafka
+    @SuppressWarnings("WeakerAccess")
     public TimeWindowedSerializer() {}
 
     public TimeWindowedSerializer(final Serializer<T> inner) {
@@ -50,7 +51,7 @@ public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {
             final String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS;
             final String value = (String) configs.get(propertyName);
             try {
-                inner = Serde.class.cast(Utils.newInstance(value, Serde.class)).serializer();
+                inner = Utils.newInstance(value, Serde.class).serializer();
                 inner.configure(configs, isKey);
             } catch (final ClassNotFoundException e) {
                 throw new ConfigException(propertyName, value, "Serde class " + value + " could not be found.");
@@ -60,8 +61,9 @@ public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {
 
     @Override
     public byte[] serialize(final String topic, final Windowed<T> data) {
-        if (data == null)
+        if (data == null) {
             return null;
+        }
 
         return WindowKeySchema.toBinary(data, inner, topic);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 942b54d..03203f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -60,19 +60,21 @@ public final class TimeWindows extends Windows<TimeWindow> {
     private final long maintainDurationMs;
 
     /** The size of the windows in milliseconds. */
+    @SuppressWarnings("WeakerAccess")
     public final long sizeMs;
 
     /**
      * The size of the window's advance interval in milliseconds, i.e., by how much a window moves forward relative to
      * the previous one.
      */
+    @SuppressWarnings("WeakerAccess")
     public final long advanceMs;
-    private final Duration grace;
+    private final long graceMs;
 
-    private TimeWindows(final long sizeMs, final long advanceMs, final Duration grace, final long maintainDurationMs) {
+    private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs, final long maintainDurationMs) {
         this.sizeMs = sizeMs;
         this.advanceMs = advanceMs;
-        this.grace = grace;
+        this.graceMs = graceMs;
         this.maintainDurationMs = maintainDurationMs;
     }
 
@@ -81,13 +83,13 @@ public final class TimeWindows extends Windows<TimeWindow> {
     @Deprecated
     private TimeWindows(final long sizeMs,
                         final long advanceMs,
-                        final Duration grace,
+                        final long graceMs,
                         final long maintainDurationMs,
                         final int segments) {
         super(segments);
         this.sizeMs = sizeMs;
         this.advanceMs = advanceMs;
-        this.grace = grace;
+        this.graceMs = graceMs;
         this.maintainDurationMs = maintainDurationMs;
     }
 
@@ -110,7 +112,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
             throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
         }
         // This is a static factory method, so we initialize grace and retention to the defaults.
-        return new TimeWindows(sizeMs, sizeMs, null, DEFAULT_RETENTION_MS);
+        return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
     }
 
     /**
@@ -125,10 +127,10 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * @return a new window definition with default maintain duration of 1 day
      * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
      */
+    @SuppressWarnings("deprecation")
     public static TimeWindows of(final Duration size) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
-        ApiUtils.validateMillisecondDuration(size, msgPrefix);
-        return of(size.toMillis());
+        return of(ApiUtils.validateMillisecondDuration(size, msgPrefix));
     }
 
     /**
@@ -150,7 +152,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
             throw new IllegalArgumentException(String.format("Window advancement interval should be more than zero " +
                     "and less than window duration which is %d ms, but given advancement interval is: %d ms", sizeMs, advanceMs));
         }
-        return new TimeWindows(sizeMs, advanceMs, grace, maintainDurationMs, segments);
+        return new TimeWindows(sizeMs, advanceMs, graceMs, maintainDurationMs, segments);
     }
 
     /**
@@ -167,8 +169,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
     @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
     public TimeWindows advanceBy(final Duration advance) {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, "advance");
-        ApiUtils.validateMillisecondDuration(advance, msgPrefix);
-        return advanceBy(advance.toMillis());
+        return advanceBy(ApiUtils.validateMillisecondDuration(advance, msgPrefix));
     }
 
     @Override
@@ -201,12 +202,12 @@ public final class TimeWindows extends Windows<TimeWindow> {
     @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
     public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
-        ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
-        if (afterWindowEnd.toMillis() < 0) {
+        final long afterWindowEndMs = ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+        if (afterWindowEndMs < 0) {
             throw new IllegalArgumentException("Grace period must not be negative.");
         }
 
-        return new TimeWindows(sizeMs, advanceMs, afterWindowEnd, maintainDurationMs, segments);
+        return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, maintainDurationMs, segments);
     }
 
     @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
@@ -215,7 +216,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
         // NOTE: in the future, when we remove maintainMs,
         // we should default the grace period to 24h to maintain the default behavior,
         // or we can default to (24h - size) if you want to be super accurate.
-        return grace != null ? grace.toMillis() : maintainMs() - size();
+        return graceMs != -1 ? graceMs : maintainMs() - size();
     }
 
     /**
@@ -233,7 +234,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
         if (durationMs < sizeMs) {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
         }
-        return new TimeWindows(sizeMs, advanceMs, grace, durationMs, segments);
+        return new TimeWindows(sizeMs, advanceMs, graceMs, durationMs, segments);
     }
 
     /**
@@ -254,20 +255,24 @@ public final class TimeWindows extends Windows<TimeWindow> {
     @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final TimeWindows that = (TimeWindows) o;
         return maintainDurationMs == that.maintainDurationMs &&
             segments == that.segments &&
             sizeMs == that.sizeMs &&
             advanceMs == that.advanceMs &&
-            Objects.equals(grace, that.grace);
+            graceMs == that.graceMs;
     }
 
     @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this
     @Override
     public int hashCode() {
-        return Objects.hash(maintainDurationMs, segments, sizeMs, advanceMs, grace);
+        return Objects.hash(maintainDurationMs, segments, sizeMs, advanceMs, graceMs);
     }
 
     @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
@@ -277,7 +282,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
             "maintainDurationMs=" + maintainDurationMs +
             ", sizeMs=" + sizeMs +
             ", advanceMs=" + advanceMs +
-            ", grace=" + grace +
+            ", graceMs=" + graceMs +
             ", segments=" + segments +
             '}';
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 0a45d81..e1894ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -16,11 +16,11 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import java.time.Instant;
 import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -47,6 +47,7 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
     private static final long DEFAULT_START_TIMESTAMP_MS = 0L;
 
     /** The start timestamp of the window. */
+    @SuppressWarnings("WeakerAccess")
     public final long startMs;
 
     private UnlimitedWindows(final long startMs) {
@@ -83,10 +84,10 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
      * @return a new unlimited window that starts at {@code start}
      * @throws IllegalArgumentException if the start time is negative or can't be represented as {@code long milliseconds}
      */
+    @SuppressWarnings("deprecation")
     public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(start, "start");
-        ApiUtils.validateMillisecondInstant(start, msgPrefix);
-        return startOn(start.toEpochMilli());
+        return startOn(ApiUtils.validateMillisecondInstant(start, msgPrefix));
     }
 
     @Override
@@ -148,8 +149,12 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
     @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final UnlimitedWindows that = (UnlimitedWindows) o;
         return startMs == that.startMs && segments == that.segments;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index ac49174..432bb45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -16,9 +16,10 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import java.time.Instant;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
+import java.time.Instant;
+
 /**
  * A single window instance, defined by its start and end timestamp.
  * {@code Window} is agnostic if start/end boundaries are inclusive or exclusive; this is defined by concrete
@@ -64,6 +65,8 @@ public abstract class Window {
 
     /**
      * Return the start timestamp of this window.
+     *
+     * @return The start timestamp of this window.
      */
     public long start() {
         return startMs;
@@ -71,6 +74,8 @@ public abstract class Window {
 
     /**
      * Return the end timestamp of this window.
+     *
+     * @return The end timestamp of this window.
      */
     public long end() {
         return endMs;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 7728317..d830f58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -65,12 +65,12 @@ public class Windowed<K> {
 
     @Override
     public boolean equals(final Object obj) {
-        if (obj == this)
+        if (obj == this) {
             return true;
-
-        if (!(obj instanceof Windowed))
+        }
+        if (!(obj instanceof Windowed)) {
             return false;
-
+        }
         final Windowed<?> that = (Windowed) obj;
         return window.equals(that.window) && key.equals(that.key);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java
index 8544164..7cd5a3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java
@@ -20,8 +20,8 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 /**
  * Listen to cache flush events
- * @param <K>
- * @param <V>
+ * @param <K> key type
+ * @param <V> value type
  */
 public interface CacheFlushListener<K, V> {
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
index d513f2b..c9a18de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
@@ -35,8 +35,12 @@ public class Change<T> {
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final Change<?> change = (Change<?>) o;
         return Objects.equals(newValue, change.newValue) &&
                 Objects.equals(oldValue, change.oldValue);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 8a76bad..7fa34b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -56,14 +56,16 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
 
         // only one of the old / new values would be not null
         if (data.newValue != null) {
-            if (data.oldValue != null)
+            if (data.oldValue != null) {
                 throw new StreamsException("Both old and new values are not null (" + data.oldValue
-                        + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
+                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
+            }
 
             serializedKey = inner.serialize(topic, headers, data.newValue);
         } else {
-            if (data.oldValue == null)
+            if (data.oldValue == null) {
                 throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
+            }
 
             serializedKey = inner.serialize(topic, headers, data.oldValue);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 8e91469..825db52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -28,7 +28,7 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
     private final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier;
 
-    public KStreamTransform(final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier) {
+    KStreamTransform(final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier) {
         this.transformerSupplier = transformerSupplier;
     }
 
@@ -41,7 +41,7 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
         private final Transformer<? super K1, ? super V1, ? extends KeyValue<? extends K2, ? extends V2>> transformer;
 
-        public KStreamTransformProcessor(final Transformer<? super K1, ? super V1, ? extends KeyValue<? extends K2, ? extends V2>> transformer) {
+        KStreamTransformProcessor(final Transformer<? super K1, ? super V1, ? extends KeyValue<? extends K2, ? extends V2>> transformer) {
             this.transformer = transformer;
         }
 
@@ -55,8 +55,9 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
         public void process(final K1 key, final V1 value) {
             final KeyValue<? extends K2, ? extends V2> pair = transformer.transform(key, value);
 
-            if (pair != null)
+            if (pair != null) {
                 context().forward(pair.key, pair.value);
+            }
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index d1e524c..1312e2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -54,8 +54,9 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     private V computeValue(final K key, final V value) {
         V newValue = null;
 
-        if (value != null && (filterNot ^ predicate.test(key, value)))
+        if (value != null && (filterNot ^ predicate.test(key, value))) {
             newValue = value;
+        }
 
         return newValue;
     }
@@ -79,8 +80,9 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
             final V newValue = computeValue(key, change.newValue);
             final V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
 
-            if (sendOldValues && oldValue == null && newValue == null)
+            if (sendOldValues && oldValue == null && newValue == null) {
                 return; // unnecessary to forward here.
+            }
 
             if (queryableName != null) {
                 store.put(key, newValue);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 2317947..aae1437 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -74,8 +74,9 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
     private V1 computeValue(final K key, final V value) {
         V1 newValue = null;
 
-        if (value != null)
+        if (value != null) {
             newValue = mapper.apply(key, value);
+        }
 
         return newValue;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
index 45e2513..fe961ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Printed;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
@@ -25,11 +24,6 @@ public class PrintedInternal<K, V> extends Printed<K, V> {
         super(printed);
     }
 
-    /**
-     * Builds the {@link ProcessorSupplier} that will be used to print the records flowing through a {@link KStream}.
-     *
-     * @return the {@code ProcessorSupplier} to be used for printing
-     */
     public ProcessorSupplier<K, V> build(final String processorName) {
         return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName));
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
index 40cc880..8111cdf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
@@ -48,7 +48,7 @@ public final class SessionWindow extends Window {
      *
      * @param other another window
      * @return {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
-     * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
+     * @throws IllegalArgumentException if the {@code other} window has a different type than {@code this} window
      */
     public boolean overlap(final Window other) throws IllegalArgumentException {
         if (getClass() != other.getClass()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
index 2823cf9..ac71282 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
@@ -55,7 +55,7 @@ public class TimeWindow extends Window {
      *
      * @param other another window
      * @return {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
-     * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
+     * @throws IllegalArgumentException if the {@code other} window has a different type than {@code this} window
      */
     @Override
     public boolean overlap(final Window other) throws IllegalArgumentException {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
index b3f8c3c..12125ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
@@ -51,7 +51,7 @@ public class UnlimitedWindow extends Window {
      *
      * @param other another window
      * @return {@code true}
-     * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
+     * @throws IllegalArgumentException if the {@code other} window has a different type than {@code this} window
      */
     @Override
     public boolean overlap(final Window other) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index 161f934..90456d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -57,8 +57,12 @@ public class EagerBufferConfigImpl extends BufferConfigInternal {
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
         return maxRecords == that.maxRecords &&
             maxBytes == that.maxBytes;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
index 4bab6d3..af1e0d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -48,8 +48,12 @@ public class FinalResultsSuppressionBuilder<K extends Windowed> implements Suppr
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final FinalResultsSuppressionBuilder<?> that = (FinalResultsSuppressionBuilder<?>) o;
         return Objects.equals(name, that.name) &&
             Objects.equals(bufferConfig, that.bufferConfig);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
index ef754ec..30427b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -69,8 +69,12 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
         return maxRecords == that.maxRecords &&
             maxBytes == that.maxBytes &&
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
index 7453475..042a81a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
@@ -71,8 +71,12 @@ public class SuppressedInternal<K> implements Suppressed<K> {
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final SuppressedInternal<?> that = (SuppressedInternal<?>) o;
         return suppressTombstones == that.suppressTombstones &&
             Objects.equals(name, that.name) &&
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index f91f22f..c10af9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -16,9 +16,10 @@
  */
 package org.apache.kafka.streams.processor;
 
-import java.time.Duration;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
+import java.time.Duration;
+
 /**
  * A processor of key-value pair records.
  *
@@ -31,7 +32,7 @@ public interface Processor<K, V> {
     /**
      * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology
      * that contains it is initialized. When the framework is done with the processor, {@link #close()} will be called on it; the
-     * framework may later re-use the processor by calling {@link #init()} again.
+     * framework may later re-use the processor by calling {@code #init()} again.
      * <p>
      * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to
      * {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator) schedule} a method to be
@@ -50,9 +51,9 @@ public interface Processor<K, V> {
     void process(K key, V value);
 
     /**
-     * Close this processor and clean up any resources. Be aware that {@link #close()} is called after an internal cleanup.
+     * Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup.
      * Thus, it is not possible to write anything to Kafka as underlying clients are already closed. The framework may
-     * later re-use this processor by calling {@link #init()} on it again.
+     * later re-use this processor by calling {@code #init()} on it again.
      * <p>
      * Note: Do not close any streams managed resources, like {@link StateStore}s here, as they are managed by the library.
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 79919a6..9d1b82c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -47,7 +47,9 @@ public class TaskId implements Comparable<TaskId> {
      */
     public static TaskId parse(final String taskIdStr) {
         final int index = taskIdStr.indexOf('_');
-        if (index <= 0 || index + 1 >= taskIdStr.length()) throw new TaskIdFormatException(taskIdStr);
+        if (index <= 0 || index + 1 >= taskIdStr.length()) {
+            throw new TaskIdFormatException(taskIdStr);
+        }
 
         try {
             final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, index));
@@ -85,8 +87,9 @@ public class TaskId implements Comparable<TaskId> {
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o)
+        if (this == o) {
             return true;
+        }
 
         if (o instanceof TaskId) {
             final TaskId other = (TaskId) o;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index e7dd4db..f3e5160 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -189,13 +189,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         return task.schedule(interval, type, callback);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
                                 final Punctuator callback) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
-        ApiUtils.validateMillisecondDuration(interval, msgPrefix);
-        return schedule(interval.toMillis(), type, callback);
+        return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback);
     }
 
     void setStreamTimeSupplier(final TimestampSupplier streamTimeSupplier) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index cd4657b..7e1466e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -98,8 +98,12 @@ public class ProcessorRecordContext implements RecordContext {
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final ProcessorRecordContext that = (ProcessorRecordContext) o;
         return timestamp == that.timestamp &&
                 offset == that.offset &&
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
index 3cd3e90..f91bdcc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
@@ -38,8 +38,9 @@ public class QuickUnion<T> {
         T current = id;
         T parent = ids.get(current);
 
-        if (parent == null)
+        if (parent == null) {
             throw new NoSuchElementException("id: " + id.toString());
+        }
 
         while (!parent.equals(current)) {
             // do the path splitting
@@ -53,7 +54,7 @@ public class QuickUnion<T> {
     }
 
     @SuppressWarnings("unchecked")
-    public void unite(final T id1, final T... idList) {
+    void unite(final T id1, final T... idList) {
         for (final T id2 : idList) {
             unitePair(id1, id2);
         }
@@ -63,8 +64,9 @@ public class QuickUnion<T> {
         final T root1 = root(id1);
         final T root2 = root(id2);
 
-        if (!root1.equals(root2))
+        if (!root1.equals(root2)) {
             ids.put(root1, root2);
+        }
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
index ca8fbff..a7a8d88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
@@ -40,7 +40,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
         REPARTITION_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
     }
 
-    public RepartitionTopicConfig(final String name, final Map<String, String> topicConfigs) {
+    RepartitionTopicConfig(final String name, final Map<String, String> topicConfigs) {
         super(name, topicConfigs);
     }
 
@@ -64,8 +64,12 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final RepartitionTopicConfig that = (RepartitionTopicConfig) o;
         return Objects.equals(name, that.name) &&
                Objects.equals(topicConfigs, that.topicConfigs);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index d7627fe..87505ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -67,15 +67,18 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         this.context = context;
 
         // if deserializers are null, get the default ones from the context
-        if (this.keyDeserializer == null)
+        if (this.keyDeserializer == null) {
             this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
-        if (this.valDeserializer == null)
+        }
+        if (this.valDeserializer == null) {
             this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
+        }
 
         // if value deserializers are for {@code Change} values, set the inner deserializer when necessary
         if (this.valDeserializer instanceof ChangedDeserializer &&
-                ((ChangedDeserializer) this.valDeserializer).inner() == null)
+                ((ChangedDeserializer) this.valDeserializer).inner() == null) {
             ((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer());
+        }
     }
 
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
index ba90558..cbbb244 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
@@ -23,7 +23,7 @@ public class Stamped<V> implements Comparable {
     public final V value;
     public final long timestamp;
 
-    public Stamped(final V value, final long timestamp) {
+    Stamped(final V value, final long timestamp) {
         this.value = value;
         this.timestamp = timestamp;
     }
@@ -32,18 +32,21 @@ public class Stamped<V> implements Comparable {
     public int compareTo(final Object other) {
         final long otherTimestamp = ((Stamped<?>) other).timestamp;
 
-        if (timestamp < otherTimestamp) return -1;
-        else if (timestamp > otherTimestamp) return 1;
+        if (timestamp < otherTimestamp) {
+            return -1;
+        } else if (timestamp > otherTimestamp) {
+            return 1;
+        }
         return 0;
     }
 
     @Override
     public boolean equals(final Object other) {
-
-        if (other == null || getClass() != other.getClass()) return false;
-
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
         final long otherTimestamp = ((Stamped<?>) other).timestamp;
-        return Long.compare(timestamp, otherTimestamp) == 0;
+        return timestamp == otherTimestamp;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 493f56b..2f649bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -304,10 +304,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 final String host = getHost(userEndPoint);
                 final Integer port = getPort(userEndPoint);
 
-                if (host == null || port == null)
+                if (host == null || port == null) {
                     throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair" +
-                                    " but received %s",
-                            logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+                            " but received %s",
+                        logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+                }
             } catch (final NumberFormatException nfe) {
                 throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s",
                         logPrefix, userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
@@ -348,9 +349,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
 
-    Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
-                                            final String topic,
-                                            final int errorCode) {
+    private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
+                                                    final String topic,
+                                                    final int errorCode) {
         log.error("{} is unknown yet during rebalance," +
             " please make sure they have been pre-created before starting the Streams application.", topic);
         final Map<String, Assignment> assignment = new HashMap<>();
@@ -598,8 +599,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 int numPartitions = UNKNOWN;
                 if (tasksByTopicGroup.get(topicGroupId) != null) {
                     for (final TaskId task : tasksByTopicGroup.get(topicGroupId)) {
-                        if (numPartitions < task.partition + 1)
+                        if (numPartitions < task.partition + 1) {
                             numPartitions = task.partition + 1;
+                        }
                     }
                     final InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
                     topicMetadata.numPartitions = numPartitions;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
index 4606a56..e55ce71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
@@ -35,7 +35,7 @@ public class UnwindowedChangelogTopicConfig extends InternalTopicConfig {
         UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
     }
 
-    public UnwindowedChangelogTopicConfig(final String name, final Map<String, String> topicConfigs) {
+    UnwindowedChangelogTopicConfig(final String name, final Map<String, String> topicConfigs) {
         super(name, topicConfigs);
     }
 
@@ -59,8 +59,12 @@ public class UnwindowedChangelogTopicConfig extends InternalTopicConfig {
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final UnwindowedChangelogTopicConfig that = (UnwindowedChangelogTopicConfig) o;
         return Objects.equals(name, that.name) &&
                Objects.equals(topicConfigs, that.topicConfigs);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
index d42642a..e177bea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
@@ -37,7 +37,7 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig {
 
     private Long retentionMs;
 
-    public WindowedChangelogTopicConfig(final String name, final Map<String, String> topicConfigs) {
+    WindowedChangelogTopicConfig(final String name, final Map<String, String> topicConfigs) {
         super(name, topicConfigs);
     }
 
@@ -71,8 +71,12 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig {
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final WindowedChangelogTopicConfig that = (WindowedChangelogTopicConfig) o;
         return Objects.equals(name, that.name) &&
                 Objects.equals(topicConfigs, that.topicConfigs) &&
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 66e655f..6eff7bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -37,7 +37,7 @@ public class ClientState {
     }
 
     ClientState(final int capacity) {
-        this(new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), capacity);
+        this(new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), capacity);
     }
 
     private ClientState(final Set<TaskId> activeTasks,
@@ -93,6 +93,7 @@ public class ClientState {
         return prevStandbyTasks;
     }
 
+    @SuppressWarnings("WeakerAccess")
     public int assignedTaskCount() {
         return assignedTasks.size();
     }
@@ -101,6 +102,7 @@ public class ClientState {
         capacity++;
     }
 
+    @SuppressWarnings("WeakerAccess")
     public int activeTaskCount() {
         return activeTasks.size();
     }
@@ -143,12 +145,13 @@ public class ClientState {
         final double otherLoad = (double) other.assignedTaskCount() / other.capacity;
         final double thisLoad = (double) assignedTaskCount() / capacity;
 
-        if (thisLoad < otherLoad)
+        if (thisLoad < otherLoad) {
             return true;
-        else if (thisLoad > otherLoad)
+        } else if (thisLoad > otherLoad) {
             return false;
-        else
+        } else {
             return capacity > other.capacity;
+        }
     }
 
     Set<TaskId> previousStandbyTasks() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 69c2ba2..157497d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -296,8 +296,12 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
 
             @Override
             public boolean equals(final Object o) {
-                if (this == o) return true;
-                if (o == null || getClass() != o.getClass()) return false;
+                if (this == o) {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass()) {
+                    return false;
+                }
                 final Pair pair = (Pair) o;
                 return Objects.equals(task1, pair.task1) &&
                         Objects.equals(task2, pair.task2);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index dd6cc4a..20c0be9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -267,8 +267,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
                 throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
             }
 
-            for (int i = 0; i < tags.length; i += 2)
+            for (int i = 0; i < tags.length; i += 2) {
                 tagMap.put(tags[i], tags[i + 1]);
+            }
         }
         tagMap.put("client-id", threadName);
         return tagMap;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
index ee645b1..0c4ea12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.common.utils.Bytes;
 
 /**
- * A store supplier that can be used to create one or more {@link KeyValueStore KeyValueStore<Bytes, byte[]>} instances of type &lt;Byte, byte[]&gt;.
+ * A store supplier that can be used to create one or more {@link KeyValueStore KeyValueStore&lt;Byte, byte[]&gt;} instances of type &lt;Byte, byte[]&gt;.
  *
- * For any stores implementing the {@link KeyValueStore KeyValueStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
+ * For any stores implementing the {@link KeyValueStore KeyValueStore&lt;Byte, byte[]&gt;} interface, null value bytes are considered as "not exist". This means:
  *
  * 1. Null value bytes in put operations should be treated as delete.
  * 2. If the key does not exist, get operations should return null value bytes.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
index 6ba6672..3d4f55c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
@@ -43,7 +43,7 @@ public interface QueryableStoreType<T> {
      *
      * @param storeProvider     provides access to all the underlying StateStore instances
      * @param storeName         The name of the Store
-     * @return a read-only interface over a {@code StateStore} (cf. {@link QueryableStoreTypes.KeyValueStoreType})
+     * @return a read-only interface over a {@code StateStore} (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
      */
     T create(final StateStoreProvider storeProvider, final String storeName);
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index f7a9970..d33e324 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -74,8 +74,7 @@ public class QueryableStoreTypes {
         }
     }
 
-    private static class KeyValueStoreType<K, V> extends
-                                                 QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
+    static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
         KeyValueStoreType() {
             super(ReadOnlyKeyValueStore.class);
         }
@@ -88,7 +87,7 @@ public class QueryableStoreTypes {
 
     }
 
-    private static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
+    static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
         WindowStoreType() {
             super(ReadOnlyWindowStore.class);
         }
@@ -100,7 +99,7 @@ public class QueryableStoreTypes {
         }
     }
 
-    private static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
+    static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
         SessionStoreType() {
             super(ReadOnlySessionStore.class);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index 0804338..5393fe1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -16,10 +16,11 @@
  */
 package org.apache.kafka.streams.state;
 
-import java.time.Instant;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 
+import java.time.Instant;
+
 /**
  * A window store that only supports read operations
  * Implementations should be thread-safe as concurrent reads and writes are expected.
@@ -47,7 +48,6 @@ public interface ReadOnlyWindowStore<K, V> {
      * <p>
      * The time range is inclusive and applies to the starting timestamp of the window.
      * For example, if we have the following windows:
-     * <p>
      * <pre>
      * +-------------------------------+
      * |  key  | start time | end time |
@@ -62,7 +62,7 @@ public interface ReadOnlyWindowStore<K, V> {
      * +--------------------------------
      * </pre>
      * And we call {@code store.fetch("A", 10, 20)} then the results will contain the first
-     * three windows from the table above, i.e., all those where 10 <= start time <= 20.
+     * three windows from the table above, i.e., all those where 10 &lt;= start time &lt;= 20.
      * <p>
      * For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
      * available window to the newest/latest window.
@@ -73,7 +73,7 @@ public interface ReadOnlyWindowStore<K, V> {
      * @return an iterator over key-value pairs {@code <timestamp, value>}
      * @throws InvalidStateStoreException if the store is not initialized
      * @throws NullPointerException If {@code null} is used for key.
-     * @deprecated Use {@link #fetch(K, Instant, Instant)} instead
+     * @deprecated Use {@link #fetch(Object, Instant, Instant)} instead
      */
     @Deprecated
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
@@ -85,7 +85,6 @@ public interface ReadOnlyWindowStore<K, V> {
      * <p>
      * The time range is inclusive and applies to the starting timestamp of the window.
      * For example, if we have the following windows:
-     * <p>
      * <pre>
      * +-------------------------------+
      * |  key  | start time | end time |
@@ -100,14 +99,14 @@ public interface ReadOnlyWindowStore<K, V> {
      * +--------------------------------
      * </pre>
      * And we call {@code store.fetch("A", Instant.ofEpochMilli(10), Instant.ofEpochMilli(20))} then the results will contain the first
-     * three windows from the table above, i.e., all those where 10 <= start time <= 20.
+     * three windows from the table above, i.e., all those where 10 &lt;= start time &lt;= 20.
      * <p>
      * For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
      * available window to the newest/latest window.
      *
      * @param key       the key to fetch
      * @param from      time range start (inclusive)
-     * @param from      time range end (inclusive)
+     * @param to        time range end (inclusive)
      * @return an iterator over key-value pairs {@code <timestamp, value>}
      * @throws InvalidStateStoreException if the store is not initialized
      * @throws NullPointerException If {@code null} is used for key.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 7991b0d..8236c1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -198,13 +198,13 @@ public class Stores {
                                                                  final boolean retainDuplicates) throws IllegalArgumentException {
         Objects.requireNonNull(name, "name cannot be null");
         final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
-        ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+        final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
         final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
-        ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
+        final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
 
-        final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L);
+        final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
 
-        return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates, defaultSegmentInterval);
+        return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval);
     }
 
     private static WindowBytesStoreSupplier persistentWindowStore(final String name,
@@ -264,8 +264,7 @@ public class Stores {
     public static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                                    final Duration retentionPeriod) {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
-        ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix);
-        return persistentSessionStore(name, retentionPeriod.toMillis());
+        return persistentSessionStore(name, ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix));
     }
 
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
index 025f36c..a14b211 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
@@ -37,8 +37,8 @@ public class StreamsMetadata {
      * operations.
      */
     public final static StreamsMetadata NOT_AVAILABLE = new StreamsMetadata(new HostInfo("unavailable", -1),
-                                                                            Collections.<String>emptySet(),
-                                                                            Collections.<TopicPartition>emptySet());
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet());
 
     private final HostInfo hostInfo;
     private final Set<String> stateStoreNames;
@@ -68,19 +68,26 @@ public class StreamsMetadata {
     public String host() {
         return hostInfo.host();
     }
+
     public int port() {
         return hostInfo.port();
     }
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final StreamsMetadata that = (StreamsMetadata) o;
-
-        if (!hostInfo.equals(that.hostInfo)) return false;
-        if (!stateStoreNames.equals(that.stateStoreNames)) return false;
+        if (!hostInfo.equals(that.hostInfo)) {
+            return false;
+        }
+        if (!stateStoreNames.equals(that.stateStoreNames)) {
+            return false;
+        }
         return topicPartitions.equals(that.topicPartitions);
 
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
index c071b34..10b96e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.common.utils.Bytes;
 
 /**
- * A store supplier that can be used to create one or more {@link WindowStore WindowStore<Bytes, byte[]>} instances of type &lt;Byte, byte[]&gt;.
+ * A store supplier that can be used to create one or more {@link WindowStore WindowStore&lt;Byte, byte[]&gt;} instances of type &lt;Byte, byte[]&gt;.
  *
- * For any stores implementing the {@link WindowStore WindowStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
+ * For any stores implementing the {@link WindowStore WindowStore&lt;Byte, byte[]&gt;} interface, null value bytes are considered as "not exist". This means:
  *
  * 1. Null value bytes in put operations should be treated as delete.
  * 2. Null value bytes should never be returned in range query results.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index cf5744b..fcbd004 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -63,7 +63,6 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * <p>
      * The time range is inclusive and applies to the starting timestamp of the window.
      * For example, if we have the following windows:
-     * <p>
      * <pre>
      * +-------------------------------+
      * |  key  | start time | end time |
@@ -78,7 +77,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * +--------------------------------
      * </pre>
      * And we call {@code store.fetch("A", 10, 20)} then the results will contain the first
-     * three windows from the table above, i.e., all those where 10 <= start time <= 20.
+     * three windows from the table above, i.e., all those where 10 &lt;= start time &lt;= 20.
      * <p>
      * For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
      * available window to the newest/latest window.
@@ -95,9 +94,10 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
 
     @Override
     default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) {
-        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
-        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+        return fetch(
+            key,
+            ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+            ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
     }
 
     /**
@@ -118,9 +118,11 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
 
     @Override
     default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) {
-        ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
-        ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+        return fetch(
+            from,
+            to,
+            ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")),
+            ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")));
     }
 
     /**
@@ -137,8 +139,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
 
     @Override
     default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) {
-        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
-        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+        return fetchAll(
+            ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+            ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index fa37f5e..0908085 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -16,15 +16,15 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
+import java.time.Instant;
 import java.util.List;
 import java.util.Objects;
 
@@ -89,11 +89,13 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
         return KeyValueIterators.emptyWindowStoreIterator();
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
-        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+        return fetch(
+            key,
+            ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+            ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
     }
 
     @SuppressWarnings("deprecation")
@@ -101,12 +103,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
         Objects.requireNonNull(from, "from can't be null");
         Objects.requireNonNull(to, "to can't be null");
-        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
-            @Override
-            public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
-                return store.fetch(from, to, timeFrom, timeTo);
-            }
-        };
+        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = store -> store.fetch(from, to, timeFrom, timeTo);
         return new DelegatingPeekingKeyValueIterator<>(storeName,
                                                        new CompositeKeyValueIterator<>(
                                                                provider.stores(storeName, windowStoreType).iterator(),
@@ -115,44 +112,37 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
-        ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+        return fetch(
+            from,
+            to,
+            ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")),
+            ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")));
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
-        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
-            @Override
-            public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
-                return store.all();
-            }
-        };
+        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = ReadOnlyWindowStore::all;
         return new DelegatingPeekingKeyValueIterator<>(storeName,
                 new CompositeKeyValueIterator<>(
                         provider.stores(storeName, windowStoreType).iterator(),
                         nextIteratorFunction));
     }
-    
+
     @Override
     @Deprecated
     public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
-        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
-            @Override
-            public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
-                return store.fetchAll(timeFrom, timeTo);
-            }
-        };
+        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = store -> store.fetchAll(timeFrom, timeTo);
         return new DelegatingPeekingKeyValueIterator<>(storeName,
                 new CompositeKeyValueIterator<>(
                         provider.stores(storeName, windowStoreType).iterator(),
                         nextIteratorFunction));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
-        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+        return fetchAll(
+            ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+            ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
index 89935c0..870eeee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -38,14 +38,18 @@ public class ContextualRecord {
         return value;
     }
 
-    public long sizeBytes() {
+    long sizeBytes() {
         return (value == null ? 0 : value.length) + recordContext.sizeBytes();
     }
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final ContextualRecord that = (ContextualRecord) o;
         return Arrays.equals(value, that.value) &&
             Objects.equals(recordContext, that.recordContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 234ea05..65f5388 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -141,8 +141,12 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
 
         @Override
         public boolean equals(final Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
             final BufferKey bufferKey = (BufferKey) o;
             return time == bufferKey.time &&
                 Objects.equals(key, bufferKey.key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index 5343635..f454862 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -75,8 +75,12 @@ class LRUCacheEntry {
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final LRUCacheEntry that = (LRUCacheEntry) o;
         return sizeBytes == that.sizeBytes &&
             isDirty() == that.isDirty() &&
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 582eb46..80078b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -108,18 +107,15 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
             valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         // register the store
-        context.register(root, new StateRestoreCallback() {
-            @Override
-            public void restore(final byte[] key, final byte[] value) {
-                restoring = true;
-                // check value for null, to avoid  deserialization error.
-                if (value == null) {
-                    delete(serdes.keyFrom(key));
-                } else {
-                    put(serdes.keyFrom(key), serdes.valueFrom(value));
-                }
-                restoring = false;
+        context.register(root, (key, value) -> {
+            restoring = true;
+            // check value for null, to avoid  deserialization error.
+            if (value == null) {
+                delete(serdes.keyFrom(key));
+            } else {
+                put(serdes.keyFrom(key), serdes.valueFrom(value));
             }
+            restoring = false;
         });
     }
 
@@ -162,8 +158,9 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public void putAll(final List<KeyValue<K, V>> entries) {
-        for (final KeyValue<K, V> entry : entries)
+        for (final KeyValue<K, V> entry : entries) {
             put(entry.key, entry.value);
+        }
     }
 
     @Override
@@ -173,7 +170,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException at every invocation
      */
     @Override
     public KeyValueIterator<K, V> range(final K from, final K to) {
@@ -181,7 +178,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException at every invocation
      */
     @Override
     public KeyValueIterator<K, V> all() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index bf748fc..8c28115 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -78,7 +78,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
 
     private final String name;
     private final String parentDir;
-    private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<KeyValueIterator>());
+    private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
 
     File dbDir;
     private RocksDB db;
@@ -516,10 +516,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
             if (next == null) {
                 return allDone();
             } else {
-                if (comparator.compare(next.key.get(), rawToKey) <= 0)
+                if (comparator.compare(next.key.get(), rawToKey) <= 0) {
                     return next;
-                else
+                } else {
                     return allDone();
+                }
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index f95a104..f80ffa3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -54,10 +54,11 @@ class Segment extends RocksDBStore implements Comparable<Segment> {
 
     @Override
     public boolean equals(final Object obj) {
-        if (obj == null || getClass() != obj.getClass()) return false;
-
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
         final Segment segment = (Segment) obj;
-        return Long.compare(id, segment.id) == 0;
+        return id == segment.id;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index cf858b0..d4aedce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -110,8 +110,9 @@ class Segments {
                 final String[] list = dir.list();
                 if (list != null) {
                     final long[] segmentIds = new long[list.length];
-                    for (int i = 0; i < list.length; i++)
+                    for (int i = 0; i < list.length; i++) {
                         segmentIds[i] = segmentIdFromSegmentName(list[i], dir);
+                    }
 
                     // open segments in the id order
                     Arrays.sort(segmentIds);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 0c1a99d..a3baebb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.BeforeClass;
@@ -74,8 +73,9 @@ public abstract class AbstractJoinIntegrationTest {
     @Parameterized.Parameters(name = "caching enabled = {0}")
     public static Collection<Object[]> data() {
         final List<Object[]> values = new ArrayList<>();
-        for (final boolean cacheEnabled : Arrays.asList(true, false))
-            values.add(new Object[] {cacheEnabled});
+        for (final boolean cacheEnabled : Arrays.asList(true, false)) {
+            values.add(new Object[]{cacheEnabled});
+        }
         return values;
     }
 
@@ -99,29 +99,24 @@ public abstract class AbstractJoinIntegrationTest {
     AtomicBoolean finalResultReached = new AtomicBoolean(false);
 
     private final List<Input<String>> input = Arrays.asList(
-            new Input<>(INPUT_TOPIC_LEFT, (String) null),
-            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_LEFT, null),
+            new Input<>(INPUT_TOPIC_RIGHT, null),
             new Input<>(INPUT_TOPIC_LEFT, "A"),
             new Input<>(INPUT_TOPIC_RIGHT, "a"),
             new Input<>(INPUT_TOPIC_LEFT, "B"),
             new Input<>(INPUT_TOPIC_RIGHT, "b"),
-            new Input<>(INPUT_TOPIC_LEFT, (String) null),
-            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_LEFT, null),
+            new Input<>(INPUT_TOPIC_RIGHT, null),
             new Input<>(INPUT_TOPIC_LEFT, "C"),
             new Input<>(INPUT_TOPIC_RIGHT, "c"),
-            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
-            new Input<>(INPUT_TOPIC_LEFT, (String) null),
-            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_RIGHT, null),
+            new Input<>(INPUT_TOPIC_LEFT, null),
+            new Input<>(INPUT_TOPIC_RIGHT, null),
             new Input<>(INPUT_TOPIC_RIGHT, "d"),
             new Input<>(INPUT_TOPIC_LEFT, "D")
     );
 
-    final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(final String value1, final String value2) {
-            return value1 + "-" + value2;
-        }
-    };
+    final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2;
 
     final boolean cacheEnabled;
 
@@ -154,8 +149,9 @@ public abstract class AbstractJoinIntegrationTest {
     void prepareEnvironment() throws InterruptedException {
         CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
 
-        if (!cacheEnabled)
+        if (!cacheEnabled) {
             STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        }
 
         STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
 
@@ -247,12 +243,7 @@ public abstract class AbstractJoinIntegrationTest {
                 producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
             }
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return finalResultReached.get();
-                }
-            }, "Never received expected final result.");
+            TestUtils.waitForCondition(() -> finalResultReached.get(), "Never received expected final result.");
 
             checkResult(OUTPUT_TOPIC, expectedFinalResult, numRecordsExpected);
 
@@ -268,7 +259,7 @@ public abstract class AbstractJoinIntegrationTest {
      * Checks the embedded queryable state store snapshot
      */
     private void checkQueryableStore(final String queryableName, final String expectedFinalResult) {
-        final ReadOnlyKeyValueStore<Long, String> store = streams.store(queryableName, QueryableStoreTypes.<Long, String>keyValueStore());
+        final ReadOnlyKeyValueStore<Long, String> store = streams.store(queryableName, QueryableStoreTypes.keyValueStore());
 
         final KeyValueIterator<Long, String> all = store.all();
         final KeyValue<Long, String> onlyEntry = all.next();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index be87eb2..2873593 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -141,9 +141,8 @@ public class RegexSourceIntegrationTest {
 
     @Test
     public void testRegexMatchesTopicsAWhenCreated() throws Exception {
-
         final Serde<String> stringSerde = Serdes.String();
-        final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
+        final List<String> expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1");
         final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
         CLUSTER.createTopic("TEST-TOPIC-1");
@@ -167,23 +166,12 @@ public class RegexSourceIntegrationTest {
             }
         });
 
-
         streams.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return assignedTopics.equals(expectedFirstAssignment);
-            }
-        }, STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
 
         CLUSTER.createTopic("TEST-TOPIC-2");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return assignedTopics.equals(expectedSecondAssignment);
-            }
-        }, STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
     }
 
@@ -192,7 +180,7 @@ public class RegexSourceIntegrationTest {
 
         final Serde<String> stringSerde = Serdes.String();
         final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
-        final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
+        final List<String> expectedSecondAssignment = Collections.singletonList("TEST-TOPIC-B");
 
         CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
 
@@ -218,21 +206,11 @@ public class RegexSourceIntegrationTest {
 
 
         streams.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return assignedTopics.equals(expectedFirstAssignment);
-            }
-        }, STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
 
         CLUSTER.deleteTopic("TEST-TOPIC-A");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return assignedTopics.equals(expectedSecondAssignment);
-            }
-        }, STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
     }
 
     @Test
@@ -252,13 +230,10 @@ public class RegexSourceIntegrationTest {
         try {
             streams.start();
 
-            final TestCondition stateStoreNameBoundToSourceTopic = new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    final Map<String, List<String>> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics();
-                    final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore");
-                    return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1");
-                }
+            final TestCondition stateStoreNameBoundToSourceTopic = () -> {
+                final Map<String, List<String>> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics();
+                final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore");
+                return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1");
             };
 
             TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]");
@@ -296,12 +271,12 @@ public class RegexSourceIntegrationTest {
 
         final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
 
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singleton(topic1TestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singleton(topic2TestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singleton(topicATestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singleton(topicCTestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singleton(topicYTestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singleton(topicZTestMessage), producerConfig, mockTime);
 
         final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
 
@@ -364,15 +339,9 @@ public class RegexSourceIntegrationTest {
                 }
             });
 
-
             partitionedStreamsLeader.start();
             partitionedStreamsFollower.start();
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return followerAssignment.equals(expectedAssignment) && leaderAssignment.equals(expectedAssignment);
-                }
-            }, "topic assignment not completed");
+            TestUtils.waitForCondition(() -> followerAssignment.equals(expectedAssignment) && leaderAssignment.equals(expectedAssignment), "topic assignment not completed");
         } finally {
             if (partitionedStreamsLeader != null) {
                 partitionedStreamsLeader.close();
@@ -402,19 +371,17 @@ public class RegexSourceIntegrationTest {
         final AtomicBoolean expectError = new AtomicBoolean(false);
 
         streams = new KafkaStreams(builder.build(), streamsConfiguration);
-        streams.setStateListener(new KafkaStreams.StateListener() {
-            @Override
-            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
-                if (newState == KafkaStreams.State.ERROR)
-                    expectError.set(true);
+        streams.setStateListener((newState, oldState) -> {
+            if (newState == KafkaStreams.State.ERROR) {
+                expectError.set(true);
             }
         });
         streams.start();
 
         final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
 
-        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Collections.singleton(fMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Collections.singleton(fooMessage), producerConfig, mockTime);
 
         final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
         try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 31a349b..830514b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.integration;
 
-import java.time.Duration;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -37,13 +36,10 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
@@ -62,6 +58,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -143,21 +140,16 @@ public class RestoreIntegrationTest {
 
         builder.table(INPUT_STREAM, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("store").withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Integer()))
                 .toStream()
-                .foreach(new ForeachAction<Integer, Integer>() {
-                    @Override
-                    public void apply(final Integer key, final Integer value) {
-                        if (numReceived.incrementAndGet() == 2 * offsetLimitDelta)
-                            shutdownLatch.countDown();
+                .foreach((key, value) -> {
+                    if (numReceived.incrementAndGet() == 2 * offsetLimitDelta) {
+                        shutdownLatch.countDown();
                     }
                 });
 
         kafkaStreams = new KafkaStreams(builder.build(), props);
-        kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
-            @Override
-            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
-                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
-                    startupLatch.countDown();
-                }
+        kafkaStreams.setStateListener((newState, oldState) -> {
+            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+                startupLatch.countDown();
             }
         });
 
@@ -210,21 +202,16 @@ public class RestoreIntegrationTest {
 
         builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("store"))
                 .toStream()
-                .foreach(new ForeachAction<Integer, Integer>() {
-                    @Override
-                    public void apply(final Integer key, final Integer value) {
-                        if (numReceived.incrementAndGet() == numberOfKeys)
-                            shutdownLatch.countDown();
+                .foreach((key, value) -> {
+                    if (numReceived.incrementAndGet() == numberOfKeys) {
+                        shutdownLatch.countDown();
                     }
                 });
 
         kafkaStreams = new KafkaStreams(builder.build(), props);
-        kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
-            @Override
-            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
-                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
-                    startupLatch.countDown();
-                }
+        kafkaStreams.setStateListener((newState, oldState) -> {
+            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+                startupLatch.countDown();
             }
         });
 
@@ -261,21 +248,15 @@ public class RestoreIntegrationTest {
 
         final KStream<Integer, Integer> stream = builder.stream(INPUT_STREAM);
         stream.groupByKey()
-                .reduce(new Reducer<Integer>() {
-                    @Override
-                    public Integer apply(final Integer value1, final Integer value2) {
-                        return value1 + value2;
-                    }
-                }, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled());
+                .reduce(
+                    (value1, value2) -> value1 + value2,
+                    Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled());
 
         final CountDownLatch startupLatch = new CountDownLatch(1);
         kafkaStreams = new KafkaStreams(builder.build(), props(APPID));
-        kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
-            @Override
-            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
-                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
-                    startupLatch.countDown();
-                }
+        kafkaStreams.setStateListener((newState, oldState) -> {
+            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+                startupLatch.countDown();
             }
         });
 
@@ -310,24 +291,16 @@ public class RestoreIntegrationTest {
 
         final KStream<Integer, Integer> stream = streamsBuilder.stream(INPUT_STREAM_2);
         final CountDownLatch processorLatch = new CountDownLatch(3);
-        stream.process(new ProcessorSupplier<Integer, Integer>() {
-            @Override
-            public Processor<Integer, Integer> get() {
-                return new KeyValueStoreProcessor(INPUT_STREAM_2, processorLatch);
-            }
-        }, INPUT_STREAM_2);
+        stream.process(() -> new KeyValueStoreProcessor(INPUT_STREAM_2, processorLatch), INPUT_STREAM_2);
 
         final Topology topology = streamsBuilder.build();
 
         kafkaStreams = new KafkaStreams(topology, props(APPID + "-logging-disabled"));
 
         final CountDownLatch latch = new CountDownLatch(1);
-        kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
-            @Override
-            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
-                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
-                    latch.countDown();
-                }
+        kafkaStreams.setStateListener((newState, oldState) -> {
+            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+                latch.countDown();
             }
         });
         kafkaStreams.start();
@@ -341,12 +314,12 @@ public class RestoreIntegrationTest {
 
     public static class KeyValueStoreProcessor implements Processor<Integer, Integer> {
 
-        private String topic;
+        private final String topic;
         private final CountDownLatch processorLatch;
 
         private KeyValueStore<Integer, Integer> store;
 
-        public KeyValueStoreProcessor(final String topic, final CountDownLatch processorLatch) {
+        KeyValueStoreProcessor(final String topic, final CountDownLatch processorLatch) {
             this.topic = topic;
             this.processorLatch = processorLatch;
         }
@@ -366,9 +339,7 @@ public class RestoreIntegrationTest {
         }
 
         @Override
-        public void close() {
-
-        }
+        public void close() { }
     }
 
     private void createStateForRestoration(final String changelogTopic) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index badc63c..eeee3bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -55,7 +55,7 @@ public class KafkaEmbedded {
 
     private final Properties effectiveConfig;
     private final File logDir;
-    public final TemporaryFolder tmpFolder;
+    private final TemporaryFolder tmpFolder;
     private final KafkaServer kafka;
 
     /**
@@ -65,6 +65,7 @@ public class KafkaEmbedded {
      *               broker should listen to.  Note that you cannot change the `log.dirs` setting
      *               currently.
      */
+    @SuppressWarnings("WeakerAccess")
     public KafkaEmbedded(final Properties config, final MockTime time) throws IOException {
         tmpFolder = new TemporaryFolder();
         tmpFolder.create();
@@ -79,16 +80,13 @@ public class KafkaEmbedded {
             brokerList(), zookeeperConnect());
     }
 
-
     /**
      * Creates the configuration for starting the Kafka broker by merging default values with
      * overwrites.
      *
      * @param initialConfig Broker configuration settings that override the default config.
-     * @return
-     * @throws IOException
      */
-    private Properties effectiveConfigFrom(final Properties initialConfig) throws IOException {
+    private Properties effectiveConfigFrom(final Properties initialConfig) {
         final Properties effectiveConfig = new Properties();
         effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
         effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "localhost");
@@ -109,6 +107,7 @@ public class KafkaEmbedded {
      * <p>
      * You can use this to tell Kafka producers and consumers how to connect to this instance.
      */
+    @SuppressWarnings("WeakerAccess")
     public String brokerList() {
         final Object listenerConfig = effectiveConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
         return kafka.config().hostName() + ":" + kafka.boundPort(
@@ -119,6 +118,7 @@ public class KafkaEmbedded {
     /**
      * The ZooKeeper connection string aka `zookeeper.connect`.
      */
+    @SuppressWarnings("WeakerAccess")
     public String zookeeperConnect() {
         return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
     }
@@ -126,6 +126,7 @@ public class KafkaEmbedded {
     /**
      * Stop the broker.
      */
+    @SuppressWarnings("WeakerAccess")
     public void stop() {
         log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...",
             brokerList(), zookeeperConnect());
@@ -186,6 +187,7 @@ public class KafkaEmbedded {
         }
     }
 
+    @SuppressWarnings("WeakerAccess")
     public AdminClient createAdminClient() {
         final Properties adminClientConfig = new Properties();
         adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
@@ -198,16 +200,19 @@ public class KafkaEmbedded {
         return AdminClient.create(adminClientConfig);
     }
 
+    @SuppressWarnings("WeakerAccess")
     public void deleteTopic(final String topic) {
         log.debug("Deleting topic { name: {} }", topic);
         try (final AdminClient adminClient = createAdminClient()) {
             adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
         } catch (final InterruptedException | ExecutionException e) {
-            if (!(e.getCause() instanceof UnknownTopicOrPartitionException))
+            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                 throw new RuntimeException(e);
+            }
         }
     }
 
+    @SuppressWarnings("WeakerAccess")
     public KafkaServer kafkaServer() {
         return kafka;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index f4ede7c..425f837 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -110,13 +110,13 @@ public class AbstractStreamTest {
                 new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name),
                 false);
             builder.addGraphNode(this.streamsGraphNode, processorNode);
-            return new KStreamImpl<K, V>(name, null, null, sourceNodes, false, processorNode, builder);
+            return new KStreamImpl<>(name, null, null, sourceNodes, false, processorNode, builder);
         }
     }
 
     private class ExtendedKStreamDummy<K, V> implements ProcessorSupplier<K, V> {
 
-        private Random rand;
+        private final Random rand;
 
         ExtendedKStreamDummy() {
             rand = new Random();
@@ -131,8 +131,9 @@ public class AbstractStreamTest {
             @Override
             public void process(final K key, final V value) {
                 // flip a coin and filter
-                if (rand.nextBoolean())
+                if (rand.nextBoolean()) {
                     context().forward(key, value);
+                }
             }
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index ec7f2fc..8607902 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.perf;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.time.Duration;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -27,23 +26,18 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.state.WindowStore;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -68,23 +62,26 @@ public class YahooBenchmark {
     static class ProjectedEvent {
         /* attributes need to be public for serializer to work */
         /* main attributes */
-        public String eventType;
-        public String adID;
+        String eventType;
+        String adID;
 
         /* other attributes */
-        public long eventTime;
-        public String userID = UUID.randomUUID().toString(); // not used
-        public String pageID = UUID.randomUUID().toString(); // not used
-        public String addType = "banner78";  // not used
-        public String ipAddress = "1.2.3.4"; // not used
+        long eventTime;
+        /* not used
+        public String userID = UUID.randomUUID().toString();
+        public String pageID = UUID.randomUUID().toString();
+        public String addType = "banner78";
+        public String ipAddress = "1.2.3.4";
+         */
     }
 
     static class CampaignAd {
         /* attributes need to be public for serializer to work */
-        public String adID;
-        public String campaignID;
+        String adID;
+        String campaignID;
     }
 
+    @SuppressWarnings("WeakerAccess")
     public YahooBenchmark(final SimpleBenchmark parent, final String campaignsTopic, final String eventsTopic) {
         this.parent = parent;
         this.campaignsTopic = campaignsTopic;
@@ -188,17 +185,17 @@ public class YahooBenchmark {
         /**
          * Default constructor needed by Kafka
          */
-        public JsonPOJOSerializer() {
-        }
+        @SuppressWarnings("WeakerAccess")
+        public JsonPOJOSerializer() {}
 
         @Override
-        public void configure(final Map<String, ?> props, final boolean isKey) {
-        }
+        public void configure(final Map<String, ?> props, final boolean isKey) {}
 
         @Override
         public byte[] serialize(final String topic, final T data) {
-            if (data == null)
+            if (data == null) {
                 return null;
+            }
 
             try {
                 return objectMapper.writeValueAsBytes(data);
@@ -208,22 +205,21 @@ public class YahooBenchmark {
         }
 
         @Override
-        public void close() {
-        }
+        public void close() {}
 
     }
 
     // Note: these are also in the streams example package, eventuall use 1 file
     private class JsonPOJODeserializer<T> implements Deserializer<T> {
-        private ObjectMapper objectMapper = new ObjectMapper();
+        private final ObjectMapper objectMapper = new ObjectMapper();
 
         private Class<T> tClass;
 
         /**
          * Default constructor needed by Kafka
          */
-        public JsonPOJODeserializer() {
-        }
+        @SuppressWarnings("WeakerAccess")
+        public JsonPOJODeserializer() {}
 
         @SuppressWarnings("unchecked")
         @Override
@@ -233,8 +229,9 @@ public class YahooBenchmark {
 
         @Override
         public T deserialize(final String topic, final byte[] bytes) {
-            if (bytes == null)
+            if (bytes == null) {
                 return null;
+            }
 
             final T data;
             try {
@@ -268,50 +265,35 @@ public class YahooBenchmark {
                                                                                      Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
         final KTable<String, String> kCampaigns = builder.table(campaignsTopic, Consumed.with(Serdes.String(), Serdes.String()));
 
-
         final KStream<String, ProjectedEvent> filteredEvents = kEvents
             // use peek to quick when last element is processed
-            .peek(new ForeachAction<String, ProjectedEvent>() {
-                @Override
-                public void apply(final String key, final ProjectedEvent value) {
-                    parent.processedRecords++;
-                    if (parent.processedRecords % 1000000 == 0) {
-                        System.out.println("Processed " + parent.processedRecords);
-                    }
-                    if (parent.processedRecords >= numRecords) {
-                        latch.countDown();
-                    }
+            .peek((key, value) -> {
+                parent.processedRecords++;
+                if (parent.processedRecords % 1000000 == 0) {
+                    System.out.println("Processed " + parent.processedRecords);
                 }
-            })
-            // only keep "view" events
-            .filter(new Predicate<String, ProjectedEvent>() {
-                @Override
-                public boolean test(final String key, final ProjectedEvent value) {
-                    return value.eventType.equals("view");
+                if (parent.processedRecords >= numRecords) {
+                    latch.countDown();
                 }
             })
+            // only keep "view" events
+            .filter((key, value) -> value.eventType.equals("view"))
             // select just a few of the columns
-            .mapValues(new ValueMapper<ProjectedEvent, ProjectedEvent>() {
-                @Override
-                public ProjectedEvent apply(final ProjectedEvent value) {
-                    final ProjectedEvent event = new ProjectedEvent();
-                    event.adID = value.adID;
-                    event.eventTime = value.eventTime;
-                    event.eventType = value.eventType;
-                    return event;
-                }
+            .mapValues(value -> {
+                final ProjectedEvent event = new ProjectedEvent();
+                event.adID = value.adID;
+                event.eventTime = value.eventTime;
+                event.eventType = value.eventType;
+                return event;
             });
 
         // deserialize the add ID and campaign ID from the stored value in Kafka
-        final KTable<String, CampaignAd> deserCampaigns = kCampaigns.mapValues(new ValueMapper<String, CampaignAd>() {
-            @Override
-            public CampaignAd apply(final String value) {
-                final String[] parts = value.split(":");
-                final CampaignAd cAdd = new CampaignAd();
-                cAdd.adID = parts[0];
-                cAdd.campaignID = parts[1];
-                return cAdd;
-            }
+        final KTable<String, CampaignAd> deserCampaigns = kCampaigns.mapValues(value -> {
+            final String[] parts = value.split(":");
+            final CampaignAd cAdd = new CampaignAd();
+            cAdd.adID = parts[0];
+            cAdd.campaignID = parts[1];
+            return cAdd;
         });
 
         // join the events with the campaigns
@@ -321,21 +303,15 @@ public class YahooBenchmark {
             Joined.with(Serdes.String(), Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), null)
         );
 
-
         // key by campaign rather than by ad as original
         final KStream<String, String> keyedByCampaign = joined
-            .selectKey(new KeyValueMapper<String, String, String>() {
-                @Override
-                public String apply(final String key, final String value) {
-                    return value;
-                }
-            });
+            .selectKey((key, value) -> value);
 
         // calculate windowed counts
         keyedByCampaign
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(Duration.ofMillis(10 * 1000)))
-            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windows"));
+            .count(Materialized.as("time-windows"));
 
         return new KafkaStreams(builder.build(), streamConfig);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 14b94da..6a7bd02 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -677,11 +677,13 @@ public class ProcessorTopologyTest {
 
         @Override
         public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
-            if (record.value().toString().matches(".*@[0-9]+"))
+            if (record.value().toString().matches(".*@[0-9]+")) {
                 return Long.parseLong(record.value().toString().split("@")[1]);
+            }
 
-            if (record.timestamp() >= 0L)
+            if (record.timestamp() >= 0L) {
                 return record.timestamp();
+            }
 
             return DEFAULT_TIMESTAMP;
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 7c5abbe..140c219 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -39,11 +38,10 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Test;
@@ -103,8 +101,8 @@ public class StreamsPartitionAssignorTest {
         "cluster",
         Collections.singletonList(Node.noNode()),
         infos,
-        Collections.<String>emptySet(),
-        Collections.<String>emptySet());
+        Collections.emptySet(),
+        Collections.emptySet());
 
     private final TaskId task0 = new TaskId(0, 0);
     private final TaskId task1 = new TaskId(0, 1);
@@ -187,7 +185,7 @@ public class StreamsPartitionAssignorTest {
         final UUID processId = UUID.randomUUID();
         mockTaskManager(prevTasks, cachedTasks, processId, builder);
 
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        configurePartitionAssignor(Collections.emptyMap());
         final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
 
         Collections.sort(subscription.topics());
@@ -219,7 +217,7 @@ public class StreamsPartitionAssignorTest {
         final UUID uuid2 = UUID.randomUUID();
 
         mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder);
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        configurePartitionAssignor(Collections.emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
@@ -284,8 +282,8 @@ public class StreamsPartitionAssignorTest {
             "cluster",
             Collections.singletonList(Node.noNode()),
             localInfos,
-            Collections.<String>emptySet(),
-            Collections.<String>emptySet());
+            Collections.emptySet(),
+            Collections.emptySet());
 
         final List<String> topics = asList("topic1", "topic2");
 
@@ -301,16 +299,16 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid1 = UUID.randomUUID();
 
-        mockTaskManager(new HashSet<TaskId>(), new HashSet<TaskId>(), uuid1, builder);
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        mockTaskManager(new HashSet<>(), new HashSet<>(), uuid1, builder);
+        configurePartitionAssignor(Collections.emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<TaskId>(), new HashSet<TaskId>(), userEndPoint).encode()));
+                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
         subscriptions.put("consumer11",
-                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<TaskId>(), new HashSet<TaskId>(), userEndPoint).encode()));
+                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
 
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata, subscriptions);
 
@@ -344,13 +342,13 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid1 = UUID.randomUUID();
 
-        mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
-        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, (Object) SingleGroupPartitionGrouperStub.class));
+        mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
+        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class));
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
+            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
 
 
         // will throw exception if it fails
@@ -376,13 +374,13 @@ public class StreamsPartitionAssignorTest {
         final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
         final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
         final  Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()),
-            Collections.<PartitionInfo>emptySet(),
-            Collections.<String>emptySet(),
-            Collections.<String>emptySet());
+            Collections.emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet());
         final UUID uuid1 = UUID.randomUUID();
 
         mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder);
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        configurePartitionAssignor(Collections.emptyMap());
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -392,15 +390,15 @@ public class StreamsPartitionAssignorTest {
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata, subscriptions);
 
         // check assigned partitions
-        assertEquals(Collections.<TopicPartition>emptySet(),
+        assertEquals(Collections.emptySet(),
             new HashSet<>(assignments.get("consumer10").partitions()));
 
         // check assignment info
-        AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
+        AssignmentInfo info10 = checkAssignment(Collections.emptySet(), assignments.get("consumer10"));
         final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
 
         assertEquals(0, allActiveTasks.size());
-        assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));
+        assertEquals(Collections.emptySet(), new HashSet<>(allActiveTasks));
 
         // then metadata gets populated
         assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -435,18 +433,18 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid1 = UUID.randomUUID();
         final UUID uuid2 = UUID.randomUUID();
-        mockTaskManager(prevTasks10, Collections.<TaskId>emptySet(), uuid1, builder);
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        mockTaskManager(prevTasks10, Collections.emptySet(), uuid1, builder);
+        configurePartitionAssignor(Collections.emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.emptySet(), userEndPoint).encode()));
 
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -496,21 +494,21 @@ public class StreamsPartitionAssignorTest {
         final UUID uuid2 = UUID.randomUUID();
 
         mockTaskManager(
-            Collections.<TaskId>emptySet(),
-            Collections.<TaskId>emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet(),
             uuid1,
             builder);
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        configurePartitionAssignor(Collections.emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
 
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -552,8 +550,9 @@ public class StreamsPartitionAssignorTest {
 
             if (stateChangelogTopics.contains(changelogTopic)) {
                 for (final TaskId id : tasks) {
-                    if (id.topicGroupId == entry.getKey())
+                    if (id.topicGroupId == entry.getKey()) {
                         ids.add(id);
+                    }
                 }
             }
         }
@@ -585,7 +584,7 @@ public class StreamsPartitionAssignorTest {
 
         mockTaskManager(prevTasks00, standbyTasks01, uuid1, builder);
 
-        configurePartitionAssignor(Collections.<String, Object>singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
@@ -631,7 +630,7 @@ public class StreamsPartitionAssignorTest {
 
     @Test
     public void testOnAssignment() {
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        configurePartitionAssignor(Collections.emptyMap());
 
         final List<TaskId> activeTaskList = asList(task0, task3);
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -677,8 +676,8 @@ public class StreamsPartitionAssignorTest {
         final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         final UUID uuid1 = UUID.randomUUID();
-        mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
+        configurePartitionAssignor(Collections.emptyMap());
         final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
@@ -711,9 +710,9 @@ public class StreamsPartitionAssignorTest {
         final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         final UUID uuid1 = UUID.randomUUID();
-        mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
+        mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
 
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        configurePartitionAssignor(Collections.emptyMap());
         final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
@@ -737,23 +736,13 @@ public class StreamsPartitionAssignorTest {
         final KStream<Object, Object> stream1 = builder
             .stream("topic1")
             // force creation of internal repartition topic
-            .map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
-                @Override
-                public KeyValue<Object, Object> apply(final Object key, final Object value) {
-                    return new KeyValue<>(key, value);
-                }
-            });
+            .map((KeyValueMapper<Object, Object, KeyValue<Object, Object>>) KeyValue::new);
 
         // KTable with 4 partitions
         final KTable<Object, Long> table1 = builder
             .table("topic3")
             // force creation of internal repartition topic
-            .groupBy(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
-                @Override
-                public KeyValue<Object, Object> apply(final Object key, final Object value) {
-                    return new KeyValue<>(key, value);
-                }
-            })
+            .groupBy(KeyValue::new)
             .count();
 
         // joining the stream and the table
@@ -761,12 +750,7 @@ public class StreamsPartitionAssignorTest {
         // forcing the stream.map to get repartitioned to a topic with four partitions.
         stream1.join(
             table1,
-            new ValueJoiner() {
-                @Override
-                public Object apply(final Object value1, final Object value2) {
-                    return null;
-                }
-            });
+            (ValueJoiner) (value1, value2) -> null);
 
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
@@ -774,11 +758,11 @@ public class StreamsPartitionAssignorTest {
         internalTopologyBuilder.setApplicationId(applicationId);
 
         mockTaskManager(
-            Collections.<TaskId>emptySet(),
-            Collections.<TaskId>emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet(),
             UUID.randomUUID(),
             internalTopologyBuilder);
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        configurePartitionAssignor(Collections.emptyMap());
 
         final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
             streamsConfig,
@@ -837,11 +821,11 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid1 = UUID.randomUUID();
         mockTaskManager(
-            Collections.<TaskId>emptySet(),
-            Collections.<TaskId>emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet(),
             uuid1,
             builder);
-        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint));
+        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
         final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
         final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
         assertEquals("localhost:8080", subscriptionInfo.userEndPoint());
@@ -854,12 +838,12 @@ public class StreamsPartitionAssignorTest {
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
         builder.addSink("sink", "output", null, null, null, "processor");
 
-        final List<String> topics = asList("topic1");
+        final List<String> topics = Collections.singletonList("topic1");
 
         final UUID uuid1 = UUID.randomUUID();
 
-        mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
-        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint));
+        mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
+        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
@@ -884,11 +868,11 @@ public class StreamsPartitionAssignorTest {
     public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
         builder.setApplicationId(applicationId);
 
-        mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), UUID.randomUUID(), builder);
+        mockTaskManager(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), builder);
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         try {
-            configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost"));
+            configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost"));
             fail("expected to an exception due to invalid config");
         } catch (final ConfigException e) {
             // pass
@@ -900,7 +884,7 @@ public class StreamsPartitionAssignorTest {
         builder.setApplicationId(applicationId);
 
         try {
-            configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost:j87yhk"));
+            configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:j87yhk"));
             fail("expected to an exception due to invalid config");
         } catch (final ConfigException e) {
             // pass
@@ -911,34 +895,22 @@ public class StreamsPartitionAssignorTest {
     public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-
-
         final KStream<Object, Object> stream1 = builder
 
             // Task 1 (should get created):
             .stream("topic1")
             // force repartitioning for aggregation
-            .selectKey(new KeyValueMapper<Object, Object, Object>() {
-                @Override
-                public Object apply(final Object key, final Object value) {
-                    return null;
-                }
-            })
+            .selectKey((key, value) -> null)
             .groupByKey()
 
             // Task 2 (should get created):
             // create repartioning and changelog topic as task 1 exists
-            .count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count"))
+            .count(Materialized.as("count"))
 
             // force repartitioning for join, but second join input topic unknown
             // -> internal repartitioning topic should not get created
             .toStream()
-            .map(new KeyValueMapper<Object, Long, KeyValue<Object, Object>>() {
-                @Override
-                public KeyValue<Object, Object> apply(final Object key, final Long value) {
-                    return null;
-                }
-            });
+            .map((KeyValueMapper<Object, Long, KeyValue<Object, Object>>) (key, value) -> null);
 
         builder
             // Task 3 (should not get created because input topic unknown)
@@ -946,23 +918,13 @@ public class StreamsPartitionAssignorTest {
 
             // force repartitioning for join, but input topic unknown
             // -> thus should not create internal repartitioning topic
-            .selectKey(new KeyValueMapper<Object, Object, Object>() {
-                @Override
-                public Object apply(final Object key, final Object value) {
-                    return null;
-                }
-            })
+            .selectKey((key, value) -> null)
 
             // Task 4 (should not get created because input topics unknown)
             // should not create any of both input repartition topics or any of both changelog topics
             .join(
                 stream1,
-                new ValueJoiner() {
-                    @Override
-                    public Object apply(final Object value1, final Object value2) {
-                        return null;
-                    }
-                },
+                (ValueJoiner) (value1, value2) -> null,
                 JoinWindows.of(ofMillis(0))
             );
 
@@ -973,11 +935,11 @@ public class StreamsPartitionAssignorTest {
         internalTopologyBuilder.setApplicationId(applicationId);
 
         mockTaskManager(
-            Collections.<TaskId>emptySet(),
-            Collections.<TaskId>emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet(),
             UUID.randomUUID(),
             internalTopologyBuilder);
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        configurePartitionAssignor(Collections.emptyMap());
 
         final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
             streamsConfig,
@@ -1008,7 +970,7 @@ public class StreamsPartitionAssignorTest {
         final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(
                 new HostInfo("localhost", 9090), Utils.mkSet(partitionOne, partitionTwo));
 
-        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+        configurePartitionAssignor(Collections.emptyMap());
 
         taskManager.setPartitionsByHostState(hostState);
         EasyMock.expectLastCall();
@@ -1030,8 +992,8 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid = UUID.randomUUID();
         mockTaskManager(
-            Collections.<TaskId>emptySet(),
-            Collections.<TaskId>emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet(),
             uuid,
             internalTopologyBuilder);
 
@@ -1183,7 +1145,7 @@ public class StreamsPartitionAssignorTest {
             emptyTasks,
             UUID.randomUUID(),
             builder);
-        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, (Object) StreamsConfig.UPGRADE_FROM_0100));
+        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100));
 
         final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
 
@@ -1274,7 +1236,7 @@ public class StreamsPartitionAssignorTest {
             equalTo(new AssignmentInfo(
                 new ArrayList<>(activeTasks),
                 standbyTaskMap,
-                Collections.<HostInfo, Set<TopicPartition>>emptyMap()
+                Collections.emptyMap()
             )));
         assertThat(assignment.get("consumer1").partitions(), equalTo(asList(t1p0, t1p1)));
 
@@ -1334,12 +1296,12 @@ public class StreamsPartitionAssignorTest {
     }
 
     private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
-        final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
-                                                       Collections.<TaskId, Set<TopicPartition>>emptyMap(),
+        final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(),
+                                                       Collections.emptyMap(),
                                                        firstHostState);
 
         return new PartitionAssignor.Assignment(
-                Collections.<TopicPartition>emptyList(), info.encode());
+                Collections.emptyList(), info.encode());
     }
 
     private AssignmentInfo checkAssignment(final Set<String> expectedTopics,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 79afb78..6df41fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -50,8 +50,7 @@ public class CompositeReadOnlyWindowStoreTest {
     private CompositeReadOnlyWindowStore<String, String>
         windowStore;
     private ReadOnlyWindowStoreStub<String, String> underlyingWindowStore;
-    private ReadOnlyWindowStoreStub<String, String>
-            otherUnderlyingStore;
+    private ReadOnlyWindowStoreStub<String, String> otherUnderlyingStore;
 
     @Rule
     public final ExpectedException windowStoreIteratorException = ExpectedException.none();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 5f18be9..aad7403 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -16,10 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -28,6 +27,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -48,7 +48,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
     private final Map<Long, NavigableMap<K, V>> data = new HashMap<>();
     private boolean open  = true;
 
-    public ReadOnlyWindowStoreStub(final long windowSize) {
+    ReadOnlyWindowStoreStub(final long windowSize) {
         this.windowSize = windowSize;
     }
 
@@ -80,9 +80,10 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
 
     @Override
     public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
-        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+        return fetch(
+            key, 
+            ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+            ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
     }
 
     @Override
@@ -103,9 +104,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
 
         return new KeyValueIterator<Windowed<K>, V>() {
             @Override
-            public void close() {
-
-            }
+            public void close() {}
 
             @Override
             public Windowed<K> peekNextKey() {
@@ -138,7 +137,9 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
         }
         final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
         for (final long now : data.keySet()) {
-            if (!(now >= timeFrom && now <= timeTo)) continue;
+            if (!(now >= timeFrom && now <= timeTo)) {
+                continue;
+            }
             final NavigableMap<K, V> kvMap = data.get(now);
             if (kvMap != null) {
                 for (final Entry<K, V> entry : kvMap.entrySet()) {
@@ -150,9 +151,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
 
         return new KeyValueIterator<Windowed<K>, V>() {
             @Override
-            public void close() {
-
-            }
+            public void close() {}
 
             @Override
             public Windowed<K> peekNextKey() {
@@ -179,9 +178,9 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
-        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+        return fetchAll(
+            ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+            ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
     }
 
     @SuppressWarnings("deprecation")
@@ -203,9 +202,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
 
         return new KeyValueIterator<Windowed<K>, V>() {
             @Override
-            public void close() {
-
-            }
+            public void close() {}
 
             @Override
             public Windowed<K> peekNextKey() {
@@ -234,14 +231,16 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
                                                             final K to,
                                                             final Instant fromTime,
                                                             final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
-        ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+        return fetch(
+            from,
+            to, 
+            ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")),
+            ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")));
     }
 
     public void put(final K key, final V value, final long timestamp) {
         if (!data.containsKey(timestamp)) {
-            data.put(timestamp, new TreeMap<K, V>());
+            data.put(timestamp, new TreeMap<>());
         }
         data.get(timestamp).put(key, value);
     }
@@ -252,19 +251,13 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
     }
 
     @Override
-    public void init(final ProcessorContext context, final StateStore root) {
-
-    }
+    public void init(final ProcessorContext context, final StateStore root) {}
 
     @Override
-    public void flush() {
-
-    }
+    public void flush() {}
 
     @Override
-    public void close() {
-
-    }
+    public void close() {}
 
     @Override
     public boolean persistent() {
@@ -276,7 +269,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
         return open;
     }
 
-    public void setOpen(final boolean open) {
+    void setOpen(final boolean open) {
         this.open = open;
     }
 
@@ -289,9 +282,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
         }
 
         @Override
-        public void close() {
-
-        }
+        public void close() {}
 
         @Override
         public Long peekNextKey() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 7087298..078cbe4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit;
 
 public class SmokeTestDriver extends SmokeTestUtil {
 
-    public static final int MAX_RECORD_EMPTY_RETRIES = 60;
+    private static final int MAX_RECORD_EMPTY_RETRIES = 60;
 
     private static class ValueList {
         public final String key;
@@ -85,16 +85,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         final int numKeys = 20;
         final int maxRecordsPerKey = 1000;
 
-        final Thread driver = new Thread() {
-            public void run() {
-                try {
-                    final Map<String, Set<Integer>> allData = generate(kafka, numKeys, maxRecordsPerKey);
-                    verify(kafka, allData, maxRecordsPerKey);
-                } catch (final Exception ex) {
-                    ex.printStackTrace();
-                }
+        final Thread driver = new Thread(() -> {
+            try {
+                final Map<String, Set<Integer>> allData = generate(kafka, numKeys, maxRecordsPerKey);
+                verify(kafka, allData, maxRecordsPerKey);
+            } catch (final Exception ex) {
+                ex.printStackTrace();
             }
-        };
+        });
 
         final Properties props = new Properties();
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
@@ -601,10 +599,11 @@ public class SmokeTestDriver extends SmokeTestUtil {
             for (final Map.Entry<String, Long> entry : map.entrySet()) {
                 final String key = entry.getKey();
                 Long expectedCount = expected.remove(key);
-                if (expectedCount == null)
+                if (expectedCount == null) {
                     expectedCount = 0L;
+                }
 
-                if (entry.getValue() != expectedCount) {
+                if (entry.getValue().longValue() != expectedCount.longValue()) {
                     if (print) {
                         System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key));
                     }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 4c3a6b2..7b4c58b 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -16,19 +16,18 @@
  */
 package org.apache.kafka.streams.processor;
 
-import java.time.Duration;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.internals.QuietStreamsConfig;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.ValueTransformer;
@@ -37,6 +36,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -124,7 +124,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         private final KeyValue keyValue;
 
         private CapturedForward(final To to, final KeyValue keyValue) {
-            if (keyValue == null) throw new IllegalArgumentException();
+            if (keyValue == null) {
+                throw new IllegalArgumentException();
+            }
 
             this.childName = to.childName;
             this.timestamp = to.timestamp;
@@ -396,12 +398,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         return capturedPunctuator::cancel;
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
                                 final Punctuator callback) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(interval, "interval");
-        return schedule(interval.toMillis(), type, callback);
+        return schedule(ApiUtils.validateMillisecondDuration(interval, "interval"), type, callback);
     }
 
     /**
@@ -411,9 +413,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      */
     @SuppressWarnings({"WeakerAccess", "unused"})
     public List<CapturedPunctuator> scheduledPunctuators() {
-        final LinkedList<CapturedPunctuator> capturedPunctuators = new LinkedList<>();
-        capturedPunctuators.addAll(punctuators);
-        return capturedPunctuators;
+        return new LinkedList<>(punctuators);
     }
 
     @SuppressWarnings("unchecked")
@@ -458,11 +458,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      *
      * @return A list of key/value pairs that were previously passed to the context.
      */
-    @SuppressWarnings({"WeakerAccess", "unused"})
     public List<CapturedForward> forwarded() {
-        final LinkedList<CapturedForward> result = new LinkedList<>();
-        result.addAll(capturedForwards);
-        return result;
+        return new LinkedList<>(capturedForwards);
     }
 
     /**
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 7fdfdbe..41b62f9 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams;
 
-import java.time.Duration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.MockProcessorContext;
@@ -32,11 +31,13 @@ import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.junit.Test;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -120,7 +121,7 @@ public class MockProcessorContextTest {
 
             final CapturedForward forward1 = forwarded.next();
             assertEquals(new KeyValue<>("start", -1L), forward1.keyValue());
-            assertEquals(null, forward1.childName());
+            assertNull(forward1.childName());
 
             final CapturedForward forward2 = forwarded.next();
             assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue());
@@ -205,7 +206,9 @@ public class MockProcessorContextTest {
 
             @Override
             public void process(final String key, final Long value) {
-                if (++count > 2) context().commit();
+                if (++count > 2) {
+                    context().commit();
+                }
             }
         };
 


Mime
View raw message