kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-5671 Followup: Remove reflections in unit test classes
Date Wed, 02 Aug 2017 22:13:07 GMT
KAFKA-5671 Followup: Remove reflections in unit test classes

1. Remove rest deprecation warnings in streams:jar.

2. Consolidate all unit test classes' reflections to access internal topology builder from packages other than `o.a.k.streams`. We need to refactor the hierarchies of StreamTask, StreamThread and KafkaStreams to remove these hacky reflections.

3. Minor fixes such as reference path, etc.

4. Minor edits on web docs for the describe function under developer-guide.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Damian Guy <damian.guy@gmail.com>

Closes #3603 from guozhangwang/K5671-followup-comments


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/125d69ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/125d69ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/125d69ca

Branch: refs/heads/trunk
Commit: 125d69caee993710bc07523fdd3deb0e06b10636
Parents: 630e9c5
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Aug 2 15:13:02 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Aug 2 15:13:02 2017 -0700

----------------------------------------------------------------------
 docs/streams/developer-guide.html               | 36 ++++++++-----
 .../apache/kafka/streams/StreamsBuilder.java    |  3 +-
 .../apache/kafka/streams/kstream/KStream.java   |  2 +-
 .../kafka/streams/kstream/KStreamBuilder.java   |  7 ++-
 .../streams/kstream/internals/KStreamImpl.java  | 22 ++++----
 .../streams/kstream/internals/KTableImpl.java   |  9 ++--
 .../internals/InternalTopologyBuilder.java      |  4 +-
 .../internals/ProcessorContextImpl.java         |  7 ++-
 .../kafka/streams/StreamsBuilderTest.java       | 13 +++++
 ...eamsFineGrainedAutoResetIntegrationTest.java | 10 +++-
 .../integration/RegexSourceIntegrationTest.java | 10 ++--
 .../internals/GlobalKTableJoinsTest.java        |  4 +-
 .../internals/InternalStreamsBuilderTest.java   | 22 +++-----
 .../kstream/internals/KStreamBranchTest.java    |  2 +-
 .../kstream/internals/KStreamFilterTest.java    |  2 +-
 .../kstream/internals/KStreamFlatMapTest.java   |  1 -
 .../kstream/internals/KStreamForeachTest.java   |  2 +-
 .../kstream/internals/KStreamImplTest.java      | 10 ++--
 .../internals/KStreamKStreamJoinTest.java       | 11 ++--
 .../internals/KStreamKStreamLeftJoinTest.java   |  5 +-
 .../internals/KStreamKTableJoinTest.java        |  5 +-
 .../internals/KStreamKTableLeftJoinTest.java    |  3 +-
 .../kstream/internals/KStreamMapTest.java       |  2 +-
 .../kstream/internals/KStreamMapValuesTest.java |  1 -
 .../kstream/internals/KStreamPeekTest.java      |  4 +-
 .../kstream/internals/KStreamSelectKeyTest.java |  2 +-
 .../kstream/internals/KTableAggregateTest.java  | 23 ++++-----
 .../kstream/internals/KTableFilterTest.java     | 33 ++++++------
 .../kstream/internals/KTableForeachTest.java    |  6 +--
 .../kstream/internals/KTableImplTest.java       | 53 ++++++++++----------
 .../kstream/internals/KTableKTableJoinTest.java | 31 +++---------
 .../internals/KTableKTableLeftJoinTest.java     |  3 +-
 .../internals/KTableKTableOuterJoinTest.java    |  3 +-
 .../kstream/internals/KTableMapKeysTest.java    |  8 +--
 .../kstream/internals/KTableMapValuesTest.java  | 11 ++--
 .../kstream/internals/KTableSourceTest.java     |  9 ++--
 .../processor/internals/StandbyTaskTest.java    | 10 ++--
 .../internals/StreamPartitionAssignorTest.java  |  6 +--
 .../processor/internals/StreamThreadTest.java   |  7 +--
 .../internals/StreamsMetadataStateTest.java     | 12 ++---
 .../apache/kafka/test/KStreamTestDriver.java    | 26 ++++------
 .../org/apache/kafka/test/StreamsTestUtils.java | 14 ------
 42 files changed, 216 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index f24b17f..529261f 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -215,13 +215,12 @@
         Thus, <code>TopologyDescritpion</code> allows to retrieve the DAG structure of the specified topology.
         <br />
         Note that global stores are listed explicitly as they are accessible by all nodes without the need to explicitly connect them.
-        Furthermore, nodes are grouped by <code>Subtopology</code>.
-        Subtopologies are groups of nodes that are directly connected to each other (i.e., either by a direct connection&mdash;but not a topic&mdash;or by sharing a store).
-        For execution, each <code>Subtopology</code> is executed by <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one or multiple tasks</a>.
-        Thus, each <code>Subtopology</code> describes an independent unit of works that can be executed by different threads in parallel.
+        Furthermore, nodes are grouped by <code>Sub-topologies</code>, where each sub-topology is a group of processor nodes that are directly connected to each other (i.e., either by a direct connection&mdash;but not a topic&mdash;or by sharing a store).
+        During execution, each <code>Sub-topology</code> will be processed by <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one or multiple tasks</a>.
+        Thus, each <code>Sub-topology</code> describes an independent unit of works that can be executed by different threads in parallel.
         <br />
-        Describing a <code>Topology</code> is helpful to reason about tasks and thus maximum parallelism.
-        It is also helpful to get insight into a <code>Topology</code> if it is not specified manually but via Kafka Streams DSL that is described in the next section.
+        Describing a <code>Topology</code> before starting your streams application with the specified topology is helpful to reason about tasks and thus maximum parallelism (we will talk about how to execute your written application later in this section).
+        It is also helpful to get insight into a <code>Topology</code> if it is not specified directly as described above but via Kafka Streams DSL (we will describe the DSL in the next section.
     </p>
 
     In the next section we present another way to build the processor topology: the Kafka Streams DSL.
@@ -425,6 +424,22 @@
     </pre>
     <br>
 
+    <h4><a id="streams_dsl_build" href="#streams_dsl_build">Generate the processor topology</a></h4>
+
+    <p>
+        Within the Streams DSL, while users are specifying the operations to create / transform various streams as described above, a <code>Topology</code> is constructed implicitly within the <code>StreamsBuilder</code>.
+        Users can generate the constructed topology at any given point in time by calling <code>build</code>:
+    </p>
+
+    <pre class="brush: java;">
+    Topology topology = builder.build();
+    </pre>
+
+    <p>
+        Users can investigate the generated <code>Topology</code> via its <code>describe</code> API, and continue building or modifying the topology until they are satisfied with it.
+        The topology then can be used to execute the application (we will talk about this later in this section).
+    </p>
+
     <h3><a id="streams_interactive_queries" href="#streams_interactive_queries">Interactive Queries</a></h3>
     <p>
         Interactive queries let you get more from streaming than just the processing of data. This feature allows you to treat the stream processing layer as a lightweight embedded database and, more concretely, <i>to directly query the latest state</i> of your stream processing application, without needing to materialize that state to external databases or external storage first.
@@ -737,7 +752,7 @@
         <code>StateStoreProvider#stores(String storeName, QueryableStoreType&lt;T&gt; queryableStoreType)</code> returns a <code>List</code> of state stores with the given <code>storeName</code> and of the type as defined by <code>queryableStoreType</code>.
     </p>
     <p>
-        An example implemention of the wrapper follows (Java 8+):
+        An example implementation of the wrapper follows (Java 8+):
     </p>
 
     <pre class="brush: java;">
@@ -1064,8 +1079,6 @@
         groupedStream.count(supplier)
     </pre>
 
-
-
     <h4><a id="streams_execute" href="#streams_execute">Executing Your Kafka Streams Application</a></h4>
     <p>
         You can call Kafka Streams from anywhere in your application code.
@@ -1074,9 +1087,8 @@
 
     <p>
         First, you must create an instance of <code>KafkaStreams</code>.
-        The first argument of the <code>KafkaStreams</code> constructor takes a <code>Topology</code>
-        that is a logical topology description (you can create a <code>Topology</code> either directly or use
-        <code>StreamsBuilder</code> to create one).
+        The first argument of the <code>KafkaStreams</code> constructor takes an instance of <code>Topology</code>.
+        This topology can be either created directly following the <code>Processor</code> API or implicitly via the <code>StreamsBuilder</code> in the higher-level Streams DSL.
         The second argument is an instance of <code>StreamsConfig</code> mentioned above.
     </p>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 3ce1521..f91aca4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -49,8 +49,9 @@ public class StreamsBuilder {
 
     /** The actual topology that is constructed by this StreamsBuilder. */
     private final Topology topology = new Topology();
+
     /** The topology's internal builder. */
-    private final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
+    final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
 
     private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 8b3ceec..5f1ba6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -52,7 +52,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
  * @param <V> Type of values
  * @see KTable
  * @see KGroupedStream
- * @see org.apache.kafka.streams.StreamsBuilder#stream(String...)
+ * @see StreamsBuilder#stream(String...)
  */
 @InterfaceStability.Evolving
 public interface KStream<K, V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index af05170..d1dd6ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplie
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
@@ -51,17 +50,17 @@ import java.util.regex.Pattern;
  * @deprecated Use {@link org.apache.kafka.streams.StreamsBuilder StreamsBuilder} instead
  */
 @Deprecated
-public class KStreamBuilder extends TopologyBuilder {
+public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyBuilder {
 
     private final AtomicInteger index = new AtomicInteger(0);
 
     private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(super.internalTopologyBuilder);
 
-    private Topology.AutoOffsetReset translateAutoOffsetReset(final TopologyBuilder.AutoOffsetReset resetPolicy) {
+    private Topology.AutoOffsetReset translateAutoOffsetReset(final org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset resetPolicy) {
         if (resetPolicy == null) {
             return null;
         }
-        return resetPolicy == TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
+        return resetPolicy == org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index d357356..040b66f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -52,6 +52,13 @@ import java.util.Set;
 
 public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
 
+    // TODO: change to package-private after removing KStreamBuilder
+    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
+
+    static final String SINK_NAME = "KSTREAM-SINK-";
+
+    static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
+
     private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
 
     private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
@@ -88,10 +95,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
 
-    static final String SINK_NAME = "KSTREAM-SINK-";
-
-    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
-
     private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
 
     private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
@@ -100,8 +103,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
 
-    static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
-
     private final KeyValueMapper<K, V, String> defaultKeyValueMapper;
 
     private final boolean repartitionRequired;
@@ -439,7 +440,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public void to(final Serde<K> keySerde,
                    final Serde<V> valSerde,
-                   StreamPartitioner<? super K, ? super V> partitioner,
+                   final StreamPartitioner<? super K, ? super V> partitioner,
                    final String topic) {
         Objects.requireNonNull(topic, "topic can't be null");
         final String name = builder.newName(SINK_NAME);
@@ -449,10 +450,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
             final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
-            partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
+            final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
+            builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, windowedPartitioner, this.name);
+        } else {
+            builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
         }
-
-        builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 4aca713..46f2636 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -50,6 +50,11 @@ import java.util.Set;
  */
 public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
 
+    // TODO: these two fields can be package-private after KStreamBuilder is removed
+    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+
+    public static final String STATE_STORE_NAME = "STATE-STORE-";
+
     private static final String FILTER_NAME = "KTABLE-FILTER-";
 
     private static final String FOREACH_NAME = "KTABLE-FOREACH-";
@@ -66,12 +71,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String SELECT_NAME = "KTABLE-SELECT-";
 
-    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
-
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
-    public static final String STATE_STORE_NAME = "STATE-STORE-";
-
     private final ProcessorSupplier<?, ?> processorSupplier;
 
     private final KeyValueMapper<K, V, String> defaultKeyValueMapper;

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 0d5cd48..c7f70a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -909,7 +909,7 @@ public class InternalTopologyBuilder {
 
     public synchronized Pattern earliestResetTopicsPattern() {
         final List<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics);
-        final Pattern earliestPattern =  buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
+        final Pattern earliestPattern = buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
 
         ensureNoRegexOverlap(earliestPattern, latestResetPatterns, latestResetTopics);
 
@@ -925,6 +925,8 @@ public class InternalTopologyBuilder {
         return  latestPattern;
     }
 
+    // TODO: we should check regex overlap at topology construction time and then throw TopologyException
+    //       instead of at runtime. See KAFKA-5660
     private void ensureNoRegexOverlap(final Pattern builtPattern,
                                       final Set<Pattern> otherPatterns,
                                       final Set<String> otherTopics) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index eb2a171..d9d7d27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -55,13 +54,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     }
 
     /**
-     * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
+     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if an attempt is made to access this state store from an unknown node
      */
     @SuppressWarnings("deprecation")
     @Override
     public StateStore getStateStore(final String name) {
         if (currentNode() == null) {
-            throw new TopologyBuilderException("Accessing from an unknown node");
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException("Accessing from an unknown node");
         }
 
         final StateStore global = stateManager.getGlobalStore(name);
@@ -70,7 +69,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         if (!currentNode().stateStores.contains(name)) {
-            throw new TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name);
         }
 
         return stateManager.getStore(name);

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 3e167d0..e4ee7c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -21,11 +21,15 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.Collection;
+import java.util.Set;
+
 import static org.junit.Assert.assertEquals;
 
 public class StreamsBuilderTest {
@@ -113,4 +117,13 @@ public class StreamsBuilderTest {
         builder.stream(Serdes.String(), Serdes.String(), null, null);
     }
 
+    // TODO: these two static functions are added because some non-TopologyBuilder unit tests need to access the internal topology builder,
+    //       which is usually a bad sign of design patterns between TopologyBuilder and StreamThread. We need to consider getting rid of them later
+    public static InternalTopologyBuilder internalTopologyBuilder(final StreamsBuilder builder) {
+        return builder.internalTopologyBuilder;
+    }
+
+    public static Collection<Set<String>> getCopartitionedGroups(final StreamsBuilder builder) {
+        return builder.internalTopologyBuilder.copartitionGroups();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index d730c6e..2ae5cc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -248,10 +248,14 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
         builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
         builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));
 
+        // TODO: we should check regex overlap at topology construction time and then throw TopologyException
+        //       instead of at runtime. See KAFKA-5660
         try {
             builder.earliestResetTopicsPattern();
             fail("Should have thrown TopologyException");
-        } catch (final TopologyException expected) { }
+        } catch (final TopologyException expected) {
+            // do nothing
+        }
     }
 
     @Test
@@ -263,7 +267,9 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
         try {
             builder.stream(Topology.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
             fail("Should have thrown TopologyException");
-        } catch (final org.apache.kafka.streams.errors.TopologyException expected) { }
+        } catch (final TopologyException expected) {
+            // do nothing
+        }
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index f7757a5..b5b6e4f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -28,11 +28,11 @@ import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -156,7 +156,7 @@ public class RegexSourceIntegrationTest {
         final StreamThread originalThread = streamThreads[0];
 
         final TestStreamThread testStreamThread = new TestStreamThread(
-            InternalStreamsBuilderTest.internalTopologyBuilder(builder),
+            StreamsBuilderTest.internalTopologyBuilder(builder),
             streamsConfig,
             new DefaultKafkaClientSupplier(),
             originalThread.applicationId,
@@ -216,7 +216,7 @@ public class RegexSourceIntegrationTest {
         final StreamThread originalThread = streamThreads[0];
 
         final TestStreamThread testStreamThread = new TestStreamThread(
-            InternalStreamsBuilderTest.internalTopologyBuilder(builder),
+            StreamsBuilderTest.internalTopologyBuilder(builder),
             streamsConfig,
             new DefaultKafkaClientSupplier(),
             originalThread.applicationId,
@@ -363,7 +363,7 @@ public class RegexSourceIntegrationTest {
         final StreamThread originalLeaderThread = leaderStreamThreads[0];
 
         final TestStreamThread leaderTestStreamThread = new TestStreamThread(
-            InternalStreamsBuilderTest.internalTopologyBuilder(builderLeader),
+            StreamsBuilderTest.internalTopologyBuilder(builderLeader),
             streamsConfig,
             new DefaultKafkaClientSupplier(),
             originalLeaderThread.applicationId,
@@ -389,7 +389,7 @@ public class RegexSourceIntegrationTest {
         final StreamThread originalFollowerThread = followerStreamThreads[0];
 
         final TestStreamThread followerTestStreamThread = new TestStreamThread(
-            InternalStreamsBuilderTest.internalTopologyBuilder(builderFollower),
+            StreamsBuilderTest.internalTopologyBuilder(builderFollower),
             streamsConfig,
             new DefaultKafkaClientSupplier(),
             originalFollowerThread.applicationId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 52e78d7..87e15be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -70,7 +70,7 @@ public class GlobalKTableJoinsTest {
     }
 
     @Test
-    public void shouldLeftJoinWithStream() throws Exception {
+    public void shouldLeftJoinWithStream() {
         stream.leftJoin(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
                 .foreach(action);
 
@@ -84,7 +84,7 @@ public class GlobalKTableJoinsTest {
     }
 
     @Test
-    public void shouldInnerJoinWithStream() throws Exception {
+    public void shouldInnerJoinWithStream() {
         stream.join(global, keyValueMapper,  MockValueJoiner.TOSTRING_JOINER)
                 .foreach(action);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index a29d4af..fc9a7fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -37,7 +36,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -61,20 +59,6 @@ public class InternalStreamsBuilderTest {
 
     private KStreamTestDriver driver = null;
 
-    public static InternalTopologyBuilder internalTopologyBuilder(final StreamsBuilder streamsBuilder) throws NoSuchFieldException, IllegalAccessException {
-        final Field internalStreamsBuilderField = streamsBuilder.getClass().getDeclaredField("internalStreamsBuilder");
-        internalStreamsBuilderField.setAccessible(true);
-        final InternalStreamsBuilder internalStreamsBuilder = (InternalStreamsBuilder) internalStreamsBuilderField.get(streamsBuilder);
-
-        return internalTopologyBuilder(internalStreamsBuilder);
-    }
-
-    public static InternalTopologyBuilder internalTopologyBuilder(final InternalStreamsBuilder internalStreamsBuilder) throws NoSuchFieldException, IllegalAccessException {
-        final Field internalTopologyBuilderField = internalStreamsBuilder.getClass().getDeclaredField("internalTopologyBuilder");
-        internalTopologyBuilderField.setAccessible(true);
-        return (InternalTopologyBuilder) internalTopologyBuilderField.get(internalStreamsBuilder);
-    }
-
     @Before
     public void setUp() {
         builder.internalTopologyBuilder.setApplicationId(APP_ID);
@@ -367,4 +351,10 @@ public class InternalStreamsBuilderTest {
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
+
+    // TODO: this static functions are added because some non-TopologyBuilder unit tests need to access the internal topology builder,
+    //       which is usually a bad sign of design patterns between TopologyBuilder and StreamThread. We need to consider getting rid of them later
+    public static InternalTopologyBuilder internalTopologyBuilder(final InternalStreamsBuilder internalStreamsBuilder) {
+        return internalStreamsBuilder.internalTopologyBuilder;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index 03401b3..f43d138 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -88,7 +88,7 @@ public class KStreamBranchTest {
     }
 
     @Test
-    public void testTypeVariance() throws Exception {
+    public void testTypeVariance() {
         Predicate<Number, Object> positive = new Predicate<Number, Object>() {
             @Override
             public boolean test(Number key, Object value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 4fc203b..cc85dc5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -82,7 +82,7 @@ public class KStreamFilterTest {
     }
 
     @Test
-    public void testTypeVariance() throws Exception {
+    public void testTypeVariance() {
         Predicate<Number, Object> numberKeyPredicate = new Predicate<Number, Object>() {
             @Override
             public boolean test(Number key, Object value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index efe7be0..c2a01c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -75,5 +75,4 @@ public class KStreamFlatMapTest {
             assertEquals(expected[i], processor.processed.get(i));
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index a929c9c..4a46005 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -88,7 +88,7 @@ public class KStreamForeachTest {
     }
 
     @Test
-    public void testTypeVariance() throws Exception {
+    public void testTypeVariance() {
         ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
             @Override
             public void apply(Number key, Object value) {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index afe116e..3e50abb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
@@ -61,7 +62,7 @@ public class KStreamImplTest {
     }
 
     @Test
-    public void testNumProcesses() throws Exception {
+    public void testNumProcesses() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         KStream<String, String> source1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2");
@@ -154,11 +155,11 @@ public class KStreamImplTest {
             1 + // to
             2 + // through
             1, // process
-            InternalStreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null).processors().size());
+            StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null).processors().size());
     }
 
     @Test
-    public void shouldUseRecordMetadataTimestampExtractorWithThrough() throws Exception {
+    public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
         final StreamsBuilder builder = new StreamsBuilder();
         KStream<String, String> stream1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2");
         KStream<String, String> stream2 = builder.stream(stringSerde, stringSerde, "topic-3", "topic-4");
@@ -166,7 +167,7 @@ public class KStreamImplTest {
         stream1.to("topic-5");
         stream2.through("topic-6");
 
-        ProcessorTopology processorTopology = InternalStreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null);
+        ProcessorTopology processorTopology = StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null);
         assertThat(processorTopology.source("topic-6").getTimestampExtractor(), instanceOf(FailOnInvalidTimestamp.class));
         assertEquals(processorTopology.source("topic-4").getTimestampExtractor(), null);
         assertEquals(processorTopology.source("topic-3").getTimestampExtractor(), null);
@@ -175,6 +176,7 @@ public class KStreamImplTest {
     }
 
     @Test
+    // TODO: this test should be refactored when we removed KStreamBuilder so that the created Topology contains internal topics as well
     public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
         final KStreamBuilder builder = new KStreamBuilder();
         KStream<String, String> kStream = builder.stream(stringSerde, stringSerde, "topic-1");

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index a87bf60..a733fae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -74,7 +75,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -172,7 +173,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -273,7 +274,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -500,7 +501,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(0).after(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -609,7 +610,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(0).before(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 410ac32..e0ba9fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -75,7 +76,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        final Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -165,7 +166,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        final Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index f32147e..8f10923 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -69,7 +70,7 @@ public class KStreamKTableJoinTest {
         table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
-        final Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -132,6 +133,4 @@ public class KStreamKTableJoinTest {
 
         processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 8c4cc38..df1f4a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -70,7 +71,7 @@ public class KStreamKTableLeftJoinTest {
         table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
         stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
-        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 877fffa..1d4dbb5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -73,7 +73,7 @@ public class KStreamMapTest {
     }
 
     @Test
-    public void testTypeVariance() throws Exception {
+    public void testTypeVariance() {
         KeyValueMapper<Number, Object, KeyValue<Number, String>> stringify = new KeyValueMapper<Number, Object, KeyValue<Number, String>>() {
             @Override
             public KeyValue<Number, String> apply(Number key, Object value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 34e73a7..9d46a50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -69,5 +69,4 @@ public class KStreamMapValuesTest {
             assertEquals(expected[i], processor.processed.get(i));
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 1dc9cd5..9c78c31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -67,7 +67,9 @@ public class KStreamPeekTest {
         try {
             stream.peek(null);
             fail("expected null action to throw NPE");
-        } catch (NullPointerException expected) { }
+        } catch (NullPointerException expected) {
+            // do nothing
+        }
     }
 
     private static <K, V> ForeachAction<K, V> collect(final List<KeyValue<K, V>> into) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index d1d785d..45c47fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -82,7 +82,7 @@ public class KStreamSelectKeyTest {
     }
 
     @Test
-    public void testTypeVariance() throws Exception {
+    public void testTypeVariance() {
         ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
             @Override
             public void apply(Number key, Object value) {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index d7360ba..f31e232 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -41,7 +41,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -59,12 +58,12 @@ public class KTableAggregateTest {
     public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
-    public void setUp() throws IOException {
+    public void setUp() {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     @Test
-    public void testAggBasic() throws Exception {
+    public void testAggBasic() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
@@ -113,7 +112,7 @@ public class KTableAggregateTest {
 
 
     @Test
-    public void testAggCoalesced() throws Exception {
+    public void testAggCoalesced() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
@@ -142,7 +141,7 @@ public class KTableAggregateTest {
 
 
     @Test
-    public void testAggRepartition() throws Exception {
+    public void testAggRepartition() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
@@ -203,7 +202,7 @@ public class KTableAggregateTest {
                 ), proc.processed);
     }
 
-    private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier<String, Long> proc) throws IOException {
+    private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier<String, Long> proc) {
         driver.setUp(builder, stateDir);
 
         driver.process(input, "A", "green");
@@ -229,7 +228,7 @@ public class KTableAggregateTest {
     }
 
     @Test
-    public void testCount() throws IOException {
+    public void testCount() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
@@ -244,7 +243,7 @@ public class KTableAggregateTest {
     }
 
     @Test
-    public void testCountWithInternalStore() throws IOException {
+    public void testCountWithInternalStore() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
@@ -259,7 +258,7 @@ public class KTableAggregateTest {
     }
 
     @Test
-    public void testCountCoalesced() throws IOException {
+    public void testCountCoalesced() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
@@ -288,7 +287,7 @@ public class KTableAggregateTest {
     }
     
     @Test
-    public void testRemoveOldBeforeAddNew() throws IOException {
+    public void testRemoveOldBeforeAddNew() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
@@ -343,7 +342,7 @@ public class KTableAggregateTest {
     }
 
     @Test
-    public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() throws Exception {
+    public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() {
         final String tableOne = "tableOne";
         final String tableTwo = "tableTwo";
         final StreamsBuilder builder = new StreamsBuilder();
@@ -372,7 +371,7 @@ public class KTableAggregateTest {
                     }
                 }, "reducer-store");
 
-        reduce.foreach(new ForeachAction<String, Long>() {
+        reduce.toStream().foreach(new ForeachAction<String, Long>() {
             @Override
             public void apply(final String key, final Long value) {
                 reduceResults.put(key, value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index d466295..3350072 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -31,7 +31,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -45,7 +44,7 @@ public class KTableFilterTest {
     private File stateDir = null;
 
     @Before
-    public void setUp() throws IOException {
+    public void setUp() {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
@@ -75,7 +74,7 @@ public class KTableFilterTest {
     public void testKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        String topic1 = "topic1";
+        final String topic1 = "topic1";
 
         KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
 
@@ -99,7 +98,7 @@ public class KTableFilterTest {
     public void testQueryableKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        String topic1 = "topic1";
+        final String topic1 = "topic1";
 
         KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
 
@@ -122,7 +121,7 @@ public class KTableFilterTest {
     private void doTestValueGetter(final StreamsBuilder builder,
                                    final KTableImpl<String, Integer, Integer> table2,
                                    final KTableImpl<String, Integer, Integer> table3,
-                                   final String topic1) throws IOException {
+                                   final String topic1) {
         KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
         KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
 
@@ -180,7 +179,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testValueGetter() throws IOException {
+    public void testValueGetter() {
         StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -206,7 +205,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testQueryableValueGetter() throws IOException {
+    public void testQueryableValueGetter() {
         StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -234,7 +233,7 @@ public class KTableFilterTest {
     private void doTestNotSendingOldValue(final StreamsBuilder builder,
                                           final KTableImpl<String, Integer, Integer> table1,
                                           final KTableImpl<String, Integer, Integer> table2,
-                                          final String topic1) throws IOException {
+                                          final String topic1) {
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
 
@@ -271,7 +270,7 @@ public class KTableFilterTest {
 
 
     @Test
-    public void testNotSendingOldValue() throws IOException {
+    public void testNotSendingOldValue() {
         StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -290,7 +289,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testQueryableNotSendingOldValue() throws IOException {
+    public void testQueryableNotSendingOldValue() {
         StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -311,7 +310,7 @@ public class KTableFilterTest {
     private void doTestSendingOldValue(final StreamsBuilder builder,
                                        final KTableImpl<String, Integer, Integer> table1,
                                        final KTableImpl<String, Integer, Integer> table2,
-                                       final String topic1) throws IOException {
+                                       final String topic1) {
         table2.enableSendingOldValues();
 
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
@@ -349,7 +348,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testSendingOldValue() throws IOException {
+    public void testSendingOldValue() {
         StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -368,7 +367,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testQueryableSendingOldValue() throws IOException {
+    public void testQueryableSendingOldValue() {
         StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -389,7 +388,7 @@ public class KTableFilterTest {
     private void doTestSkipNullOnMaterialization(final StreamsBuilder builder,
                                                  final KTableImpl<String, String, String> table1,
                                                  final KTableImpl<String, String, String> table2,
-                                                 final String topic1) throws IOException {
+                                                 final String topic1) {
         MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
         MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
 
@@ -407,7 +406,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testSkipNullOnMaterialization() throws IOException {
+    public void testSkipNullOnMaterialization() {
         // Do not explicitly set enableSendingOldValues. Let a further downstream stateful operator trigger it instead.
         StreamsBuilder builder = new StreamsBuilder();
 
@@ -428,7 +427,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testQueryableSkipNullOnMaterialization() throws IOException {
+    public void testQueryableSkipNullOnMaterialization() {
         // Do not explicitly set enableSendingOldValues. Let a further downstream stateful operator trigger it instead.
         StreamsBuilder builder = new StreamsBuilder();
 
@@ -449,7 +448,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testTypeVariance() throws Exception {
+    public void testTypeVariance() {
         Predicate<Number, Object> numberKeyPredicate = new Predicate<Number, Object>() {
             @Override
             public boolean test(Number key, Object value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index d218d4b..693aac8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -29,7 +29,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -37,6 +36,7 @@ import java.util.Locale;
 
 import static org.junit.Assert.assertEquals;
 
+@Deprecated
 public class KTableForeachTest {
 
     final private String topicName = "topic";
@@ -47,7 +47,7 @@ public class KTableForeachTest {
     public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
-    public void setUp() throws IOException {
+    public void setUp() {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
@@ -98,7 +98,7 @@ public class KTableForeachTest {
     }
 
     @Test
-    public void testTypeVariance() throws Exception {
+    public void testTypeVariance() {
         ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
             @Override
             public void apply(Number key, Object value) {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index f791012..f06cc63 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -40,7 +40,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
 import java.lang.reflect.Field;
 
 import static org.junit.Assert.assertEquals;
@@ -58,7 +57,7 @@ public class KTableImplTest {
     private KTable<String, String> table;
 
     @Before
-    public void setUp() throws IOException {
+    public void setUp() {
         stateDir = TestUtils.tempDirectory("kafka-test");
         builder = new StreamsBuilder();
         table = builder.table("test", "test");
@@ -122,7 +121,7 @@ public class KTableImplTest {
     }
 
     @Test
-    public void testValueGetter() throws IOException {
+    public void testValueGetter() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -250,7 +249,7 @@ public class KTableImplTest {
     }
 
     @Test
-    public void testStateStoreLazyEval() throws IOException {
+    public void testStateStoreLazyEval() {
         String topic1 = "topic1";
         String topic2 = "topic2";
         String storeName1 = "storeName1";
@@ -285,7 +284,7 @@ public class KTableImplTest {
     }
 
     @Test
-    public void testStateStore() throws IOException {
+    public void testStateStore() {
         String topic1 = "topic1";
         String topic2 = "topic2";
         String storeName1 = "storeName1";
@@ -328,7 +327,7 @@ public class KTableImplTest {
     }
 
     @Test
-    public void testRepartition() throws Exception {
+    public void testRepartition() throws NoSuchFieldException, IllegalAccessException {
         String topic1 = "topic1";
         String storeName1 = "storeName1";
 
@@ -368,110 +367,110 @@ public class KTableImplTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullSelectorOnToStream() throws Exception {
+    public void shouldNotAllowNullSelectorOnToStream() {
         table.toStream(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTopicOnTo() throws Exception {
+    public void shouldNotAllowNullTopicOnTo() {
         table.to(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullPredicateOnFilter() throws Exception {
+    public void shouldNotAllowNullPredicateOnFilter() {
         table.filter(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullPredicateOnFilterNot() throws Exception {
+    public void shouldNotAllowNullPredicateOnFilterNot() {
         table.filterNot(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullMapperOnMapValues() throws Exception {
+    public void shouldNotAllowNullMapperOnMapValues() {
         table.mapValues(null);
     }
 
     @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullFilePathOnWriteAsText() throws Exception {
+    public void shouldNotAllowNullFilePathOnWriteAsText() {
         table.writeAsText(null);
     }
 
     @SuppressWarnings("deprecation")
     @Test(expected = TopologyException.class)
-    public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
+    public void shouldNotAllowEmptyFilePathOnWriteAsText() {
         table.writeAsText("\t  \t");
     }
 
     @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullActionOnForEach() throws Exception {
+    public void shouldNotAllowNullActionOnForEach() {
         table.foreach(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldAllowNullTopicInThrough() throws Exception {
+    public void shouldAllowNullTopicInThrough() {
         table.through((String) null, "store");
     }
 
     @Test
-    public void shouldAllowNullStoreInThrough() throws Exception {
+    public void shouldAllowNullStoreInThrough() {
         table.through("topic", (String) null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullSelectorOnGroupBy() throws Exception {
+    public void shouldNotAllowNullSelectorOnGroupBy() {
         table.groupBy(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullOtherTableOnJoin() throws Exception {
+    public void shouldNotAllowNullOtherTableOnJoin() {
         table.join(null, MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test
-    public void shouldAllowNullStoreInJoin() throws Exception {
+    public void shouldAllowNullStoreInJoin() {
         table.join(table, MockValueJoiner.TOSTRING_JOINER, null, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullStoreSupplierInJoin() throws Exception {
+    public void shouldNotAllowNullStoreSupplierInJoin() {
         table.join(table, MockValueJoiner.TOSTRING_JOINER, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullStoreSupplierInLeftJoin() throws Exception {
+    public void shouldNotAllowNullStoreSupplierInLeftJoin() {
         table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullStoreSupplierInOuterJoin() throws Exception {
+    public void shouldNotAllowNullStoreSupplierInOuterJoin() {
         table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullJoinerJoin() throws Exception {
+    public void shouldNotAllowNullJoinerJoin() {
         table.join(table, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullOtherTableOnOuterJoin() throws Exception {
+    public void shouldNotAllowNullOtherTableOnOuterJoin() {
         table.outerJoin(null, MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullJoinerOnOuterJoin() throws Exception {
+    public void shouldNotAllowNullJoinerOnOuterJoin() {
         table.outerJoin(table, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullJoinerOnLeftJoin() throws Exception {
+    public void shouldNotAllowNullJoinerOnLeftJoin() {
         table.leftJoin(table, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullOtherTableOnLeftJoin() throws Exception {
+    public void shouldNotAllowNullOtherTableOnLeftJoin() {
         table.leftJoin(null, MockValueJoiner.TOSTRING_JOINER);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index dd14b55..124114b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -30,8 +31,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -56,28 +55,15 @@ public class KTableKTableJoinTest {
     public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
-    public void setUp() throws IOException {
+    public void setUp() {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-    public static Collection<Set<String>> getCopartitionedGroups(StreamsBuilder builder) {
-        // TODO: we should refactor this to avoid usage of reflection
-        try {
-            final Field internalStreamsBuilderField = builder.getClass().getDeclaredField("internalStreamsBuilder");
-            internalStreamsBuilderField.setAccessible(true);
-            final InternalStreamsBuilder internalStreamsBuilder = (InternalStreamsBuilder) internalStreamsBuilderField.get(builder);
-
-            return internalStreamsBuilder.internalTopologyBuilder.copartitionGroups();
-        } catch (final NoSuchFieldException | IllegalAccessException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     private void doTestJoin(final StreamsBuilder builder,
                             final int[] expectedKeys,
                             final MockProcessorSupplier<Integer, String> processor,
                             final KTable<Integer, String> joined) {
-        final Collection<Set<String>> copartitionGroups = getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -168,7 +154,7 @@ public class KTableKTableJoinTest {
     }
 
     @Test
-    public void testJoin() throws Exception {
+    public void testJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -189,7 +175,7 @@ public class KTableKTableJoinTest {
 
 
     @Test
-    public void testQueryableJoin() throws Exception {
+    public void testQueryableJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -216,7 +202,6 @@ public class KTableKTableJoinTest {
                                         final KTable<Integer, String> joined,
                                         final boolean sendOldValues) {
 
-
         driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
@@ -290,7 +275,7 @@ public class KTableKTableJoinTest {
     }
 
     @Test
-    public void testNotSendingOldValues() throws Exception {
+    public void testNotSendingOldValues() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -311,7 +296,7 @@ public class KTableKTableJoinTest {
     }
 
     @Test
-    public void testQueryableNotSendingOldValues() throws Exception {
+    public void testQueryableNotSendingOldValues() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -332,7 +317,7 @@ public class KTableKTableJoinTest {
     }
 
     @Test
-    public void testSendingOldValues() throws Exception {
+    public void testSendingOldValues() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 5abf948..fe92f2b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -77,7 +78,7 @@ public class KTableKTableLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         joined.toStream().process(processor);
 
-        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 107e76a..7ab6d87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -76,7 +77,7 @@ public class KTableKTableOuterJoinTest {
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(processor);
 
-        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
+        Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index ba27e8d..756404f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -32,7 +32,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -48,7 +47,7 @@ public class KTableMapKeysTest {
 
     
     @Before
-     public void setUp() throws IOException {
+     public void setUp() {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
@@ -78,8 +77,6 @@ public class KTableMapKeysTest {
         final int[] originalKeys = new int[]{1, 2, 3};
         final String[] values = new String[]{"V_ONE", "V_TWO", "V_THREE"};
 
-
-
         MockProcessorSupplier<String, String> processor = new MockProcessorSupplier<>();
 
         convertedStream.process(processor);
@@ -96,7 +93,4 @@ public class KTableMapKeysTest {
             assertEquals(expected[i], processor.processed.get(i));
         }
     }
-
-
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 6f44112..2e7ccad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -31,7 +31,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -46,7 +45,7 @@ public class KTableMapValuesTest {
     private File stateDir = null;
 
     @Before
-    public void setUp() throws IOException {
+    public void setUp() {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
@@ -203,7 +202,7 @@ public class KTableMapValuesTest {
     }
 
     @Test
-    public void testValueGetter() throws IOException {
+    public void testValueGetter() {
         StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -234,7 +233,7 @@ public class KTableMapValuesTest {
     }
 
     @Test
-    public void testQueryableValueGetter() throws IOException {
+    public void testQueryableValueGetter() {
         StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -265,7 +264,7 @@ public class KTableMapValuesTest {
     }
 
     @Test
-    public void testNotSendingOldValue() throws IOException {
+    public void testNotSendingOldValue() {
         StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -313,7 +312,7 @@ public class KTableMapValuesTest {
     }
 
     @Test
-    public void testSendingOldValue() throws IOException {
+    public void testSendingOldValue() {
         StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 0c808bc..3f8a6b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -29,7 +29,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -44,7 +43,7 @@ public class KTableSourceTest {
     private File stateDir = null;
 
     @Before
-    public void setUp() throws IOException {
+    public void setUp() {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
@@ -73,7 +72,7 @@ public class KTableSourceTest {
     }
 
     @Test
-    public void testValueGetter() throws IOException {
+    public void testValueGetter() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -117,7 +116,7 @@ public class KTableSourceTest {
     }
 
     @Test
-    public void testNotSendingOldValue() throws IOException {
+    public void testNotSendingOldValue() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
@@ -155,7 +154,7 @@ public class KTableSourceTest {
     }
 
     @Test
-    public void testSendingOldValue() throws IOException {
+    public void testSendingOldValue() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";


Mime
View raw message