kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-3623: KStreamTestDriver extends ExternalResource
Date Tue, 01 Aug 2017 23:16:40 GMT
KAFKA-3623: KStreamTestDriver extends ExternalResource

In the streams project, there are a number of unit tests that has duplicate
code with respect to the tearDown() method, in which it tries to close the
KStreamTestDriver connection. The goal of this changeset is to eliminate
this duplication by converting the KStreamTestDriver class to an ExternalResource
class which is the base class of JUnit Rule.

In every unit tests that calls KStreamTestDriver, we annotate the KStreamTestDriver
using Rule annotation. In the KStreamTestDriver class, we override the after()
method. This after() method in turn calls the close() method which was previously
called in the tearDown() method in the unit tests. By annotating the KStreamTestDriver
as a Rule, the after() method will be called automatically after every testcase.

Author: johnma14 <mariamj@us.ibm.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3589 from johnma14/bug/KAFKA-3623


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/edcefccf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/edcefccf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/edcefccf

Branch: refs/heads/trunk
Commit: edcefccfd7bd83110746fa81669e0d69ece5bb4b
Parents: 228a4fd
Author: Mariam John <mariamj@us.ibm.com>
Authored: Tue Aug 1 16:16:34 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Aug 1 16:16:34 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/StreamsBuilderTest.java       | 19 ++---
 .../streams/kstream/KStreamBuilderTest.java     | 20 ++---
 .../kstream/internals/AbstractStreamTest.java   | 15 ++--
 .../internals/GlobalKTableJoinsTest.java        | 15 +---
 .../internals/KGroupedStreamImplTest.java       | 21 ++---
 .../internals/KGroupedTableImplTest.java        | 15 +---
 .../kstream/internals/KStreamBranchTest.java    | 15 +---
 .../kstream/internals/KStreamFilterTest.java    | 20 ++---
 .../kstream/internals/KStreamFlatMapTest.java   | 15 +---
 .../internals/KStreamFlatMapValuesTest.java     | 15 +---
 .../kstream/internals/KStreamForeachTest.java   | 16 +---
 .../internals/KStreamKStreamJoinTest.java       | 23 ++----
 .../internals/KStreamKStreamLeftJoinTest.java   | 18 ++---
 .../internals/KStreamKTableJoinTest.java        | 16 +---
 .../internals/KStreamKTableLeftJoinTest.java    | 15 +---
 .../kstream/internals/KStreamMapTest.java       | 16 +---
 .../kstream/internals/KStreamMapValuesTest.java | 16 +---
 .../kstream/internals/KStreamPeekTest.java      | 15 ++--
 .../kstream/internals/KStreamPrintTest.java     | 18 ++---
 .../kstream/internals/KStreamSelectKeyTest.java | 16 +---
 .../kstream/internals/KStreamTransformTest.java | 16 +---
 .../internals/KStreamTransformValuesTest.java   | 16 +---
 .../internals/KStreamWindowAggregateTest.java   | 21 ++---
 .../kstream/internals/KTableAggregateTest.java  | 26 +++----
 .../kstream/internals/KTableFilterTest.java     | 24 ++----
 .../kstream/internals/KTableForeachTest.java    | 16 +---
 .../kstream/internals/KTableImplTest.java       | 24 ++----
 .../kstream/internals/KTableKTableJoinTest.java | 18 ++---
 .../internals/KTableKTableLeftJoinTest.java     | 22 ++----
 .../internals/KTableKTableOuterJoinTest.java    | 20 ++---
 .../kstream/internals/KTableMapKeysTest.java    | 18 ++---
 .../kstream/internals/KTableMapValuesTest.java  | 22 ++----
 .../kstream/internals/KTableSourceTest.java     | 22 ++----
 .../apache/kafka/test/KStreamTestDriver.java    | 82 +++++++++++---------
 34 files changed, 225 insertions(+), 461 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index b0a0743..3e167d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -32,15 +32,8 @@ public class StreamsBuilderTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
 
-    private KStreamTestDriver driver = null;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test(expected = TopologyException.class)
     public void testFrom() {
@@ -58,7 +51,7 @@ public class StreamsBuilderTest {
 
         source.process(processorSupplier);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         driver.setTime(0L);
 
         driver.process("topic-source", "A", "aa");
@@ -78,7 +71,7 @@ public class StreamsBuilderTest {
         source.process(sourceProcessorSupplier);
         through.process(throughProcessorSupplier);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         driver.setTime(0L);
 
         driver.process("topic-source", "A", "aa");
@@ -99,7 +92,7 @@ public class StreamsBuilderTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         driver.setTime(0L);
 
         driver.process(topic1, "A", "aa");

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 719bab3..ab17c9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -31,8 +31,8 @@ import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.MockTimestampExtractor;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.HashSet;
@@ -55,22 +55,14 @@ public class KStreamBuilderTest {
     private static final String APP_ID = "app-id";
 
     private final KStreamBuilder builder = new KStreamBuilder();
-
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
     public void setUp() {
         builder.setApplicationId(APP_ID);
     }
 
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Test(expected = TopologyBuilderException.class)
     public void testFrom() {
         builder.stream("topic-1", "topic-2");
@@ -101,7 +93,7 @@ public class KStreamBuilderTest {
 
         source.process(processorSupplier);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         driver.setTime(0L);
 
         driver.process("topic-source", "A", "aa");
@@ -121,7 +113,7 @@ public class KStreamBuilderTest {
         source.process(sourceProcessorSupplier);
         through.process(throughProcessorSupplier);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         driver.setTime(0L);
 
         driver.process("topic-source", "A", "aa");
@@ -155,7 +147,7 @@ public class KStreamBuilderTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         driver.setTime(0L);
 
         driver.process(topic1, "A", "aa");

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 96e1373..6b91b2b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.Random;
@@ -33,14 +33,9 @@ import static org.junit.Assert.assertTrue;
 
 public class AbstractStreamTest {
 
-    private KStreamTestDriver driver;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-    }
+    private final String topicName = "topic";
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
     public void testShouldBeExtensible() {
@@ -53,7 +48,7 @@ public class AbstractStreamTest {
 
         stream.randomFilter().process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, "V" + expectedKey);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 9f6c023..52e78d7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -25,8 +25,8 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -47,7 +47,8 @@ public class GlobalKTableJoinsTest {
     private KStream<String, String> stream;
     private KeyValueMapper<String, String, String> keyValueMapper;
     private ForeachAction<String, String> action;
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
     public void setUp() throws Exception {
@@ -68,14 +69,6 @@ public class GlobalKTableJoinsTest {
         };
     }
 
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Test
     public void shouldLeftJoinWithStream() throws Exception {
         stream.leftJoin(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
@@ -103,7 +96,7 @@ public class GlobalKTableJoinsTest {
     }
 
     private void verifyJoin(final Map<String, String> expected, final String joinInput) {
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         driver.setTime(0L);
         // write some data to the global table
         driver.process(globalTopic, "a", "A");

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 5a7147f..e8f2a01 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -41,8 +41,8 @@ import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -62,7 +62,8 @@ public class KGroupedStreamImplTest {
     private static final String INVALID_STORE_NAME = "~foo bar~";
     private final StreamsBuilder builder = new StreamsBuilder();
     private KGroupedStream<String, String> groupedStream;
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
     public void before() {
@@ -70,14 +71,6 @@ public class KGroupedStreamImplTest {
         groupedStream = stream.groupByKey(Serdes.String(), Serdes.String());
     }
 
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullReducerOnReduce() throws Exception {
         groupedStream.reduce(null, "store");
@@ -180,7 +173,7 @@ public class KGroupedStreamImplTest {
     }
 
     private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) throws Exception {
-        driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
+        driver.setUp(builder, TestUtils.tempDirectory());
         driver.setTime(10);
         driver.process(TOPIC, "1", "1");
         driver.setTime(15);
@@ -260,7 +253,7 @@ public class KGroupedStreamImplTest {
     }
 
     private void doCountSessionWindows(final Map<Windowed<String>, Long> results) throws Exception {
-        driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
+        driver.setUp(builder, TestUtils.tempDirectory());
         driver.setTime(10);
         driver.process(TOPIC, "1", "1");
         driver.setTime(15);
@@ -308,7 +301,7 @@ public class KGroupedStreamImplTest {
     }
 
     private void doReduceSessionWindows(final Map<Windowed<String>, String> results) throws Exception {
-        driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
+        driver.setUp(builder, TestUtils.tempDirectory());
         driver.setTime(10);
         driver.process(TOPIC, "1", "A");
         driver.setTime(15);
@@ -485,7 +478,7 @@ public class KGroupedStreamImplTest {
     }
 
     private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) throws Exception {
-        driver = new KStreamTestDriver(builder, TestUtils.tempDirectory(), 0);
+        driver.setUp(builder, TestUtils.tempDirectory(), 0);
         driver.setTime(0);
         driver.process(TOPIC, "1", "A");
         driver.process(TOPIC, "2", "B");

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 5dce4c6..9204b88 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -32,8 +32,8 @@ import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -48,7 +48,8 @@ public class KGroupedTableImplTest {
     private final StreamsBuilder builder = new StreamsBuilder();
     private static final String INVALID_STORE_NAME = "~foo bar~";
     private KGroupedTable<String, String> groupedTable;
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
     public void before() {
@@ -56,14 +57,6 @@ public class KGroupedTableImplTest {
                 .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
     }
 
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Test
     public void shouldAllowNullStoreNameOnCount()  {
         groupedTable.count((String) null);
@@ -128,7 +121,7 @@ public class KGroupedTableImplTest {
             }
         });
 
-        driver = new KStreamTestDriver(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
+        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
         driver.setTime(10L);
         driver.process(topic, "A", 1.1);
         driver.process(topic, "B", 2.2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index b199c34..03401b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.lang.reflect.Array;
@@ -33,15 +33,8 @@ public class KStreamBranchTest {
 
     private String topicName = "topic";
 
-    private KStreamTestDriver driver = null;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @SuppressWarnings("unchecked")
     @Test
@@ -84,7 +77,7 @@ public class KStreamBranchTest {
             branches[i].process(processors[i]);
         }
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, "V" + expectedKey);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 199d8b2..4fc203b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -30,16 +30,9 @@ import static org.junit.Assert.assertEquals;
 public class KStreamFilterTest {
 
     private String topicName = "topic";
-
-    private KStreamTestDriver driver = null;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+  
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
         @Override
@@ -60,7 +53,7 @@ public class KStreamFilterTest {
         stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.filter(isMultipleOfThree).process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, "V" + expectedKey);
         }
@@ -80,7 +73,7 @@ public class KStreamFilterTest {
         stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.filterNot(isMultipleOfThree).process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, "V" + expectedKey);
         }
@@ -102,5 +95,6 @@ public class KStreamFilterTest {
             .filter(numberKeyPredicate)
             .filterNot(numberKeyPredicate)
             .to("nirvana");
+        
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 72fb547..efe7be0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -34,15 +34,8 @@ public class KStreamFlatMapTest {
 
     private String topicName = "topic";
 
-    private KStreamTestDriver driver = null;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
     public void testFlatMap() {
@@ -69,7 +62,7 @@ public class KStreamFlatMapTest {
         stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.flatMap(mapper).process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, "V" + expectedKey);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 48fd219..ccfec98 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -33,15 +33,8 @@ public class KStreamFlatMapValuesTest {
 
     private String topicName = "topic";
 
-    private KStreamTestDriver driver = null;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
     public void testFlatMapValues() {
@@ -67,7 +60,7 @@ public class KStreamFlatMapValuesTest {
         stream = builder.stream(Serdes.Integer(), Serdes.Integer(), topicName);
         stream.flatMapValues(mapper).process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, expectedKey);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index b3a9eb1..a929c9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -39,16 +39,8 @@ public class KStreamForeachTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
     public void testForeach() {
@@ -82,7 +74,7 @@ public class KStreamForeachTest {
         stream.foreach(action);
 
         // Then
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (KeyValue<Integer, String> record: inputRecords) {
             driver.process(topicName, record.key, record.value);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 2a8b260..a87bf60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -27,8 +27,8 @@ import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -48,17 +48,10 @@ public class KStreamKStreamJoinTest {
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
 
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
@@ -86,7 +79,7 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
         // push two items to the primary stream. the other window is empty
@@ -184,7 +177,7 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
         // push two items to the primary stream. the other window is empty.this should produce two items
@@ -285,7 +278,7 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
 
         // push two items to the primary stream. the other window is empty. this should produce no items.
@@ -512,7 +505,7 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         for (int i = 0; i < expectedKeys.length; i++) {
             setRecordContext(time + i, topic1);
@@ -621,7 +614,7 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         for (int i = 0; i < expectedKeys.length; i++) {
             setRecordContext(time + i, topic1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 35ee44c..410ac32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -27,8 +27,8 @@ import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -47,24 +47,16 @@ public class KStreamKStreamLeftJoinTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
 
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
 
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-
     @Test
     public void testLeftJoin() throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -88,7 +80,7 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
         // push two items to the primary stream. the other window is empty
@@ -178,7 +170,7 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         // push two items to the primary stream. the other window is empty. this should produce two items
         // w1 = {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 666a82d..f32147e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -25,8 +25,8 @@ import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -45,18 +45,10 @@ public class KStreamKTableJoinTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
 
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
@@ -82,7 +74,7 @@ public class KStreamKTableJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
         // push two items to the primary stream. the other table is empty

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 2610c7e..8c4cc38 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -25,8 +25,8 @@ import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -45,17 +45,10 @@ public class KStreamKTableLeftJoinTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
 
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
 
     @Before
     public void setUp() throws IOException {
@@ -82,7 +75,7 @@ public class KStreamKTableLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
         // push two items to the primary stream. the other table is empty

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 4c29218..877fffa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -35,16 +35,8 @@ public class KStreamMapTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
     public void testMap() {
@@ -66,7 +58,7 @@ public class KStreamMapTest {
         processor = new MockProcessorSupplier<>();
         stream.map(mapper).process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, "V" + expectedKey);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 9c3dfdd..34e73a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -34,16 +34,8 @@ public class KStreamMapValuesTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
     public void testFlatMapValues() {
@@ -64,7 +56,7 @@ public class KStreamMapValuesTest {
         stream = builder.stream(intSerde, stringSerde, topicName);
         stream.mapValues(mapper).process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, Integer.toString(expectedKey));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index bf60a61..1dc9cd5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -23,7 +23,8 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.junit.After;
+
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -37,14 +38,8 @@ public class KStreamPeekTest {
     private final String topicName = "topic";
     private final Serde<Integer> intSerd = Serdes.Integer();
     private final Serde<String> stringSerd = Serdes.String();
-    private KStreamTestDriver driver = null;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
     public void shouldObserveStreamElements() {
@@ -53,7 +48,7 @@ public class KStreamPeekTest {
         final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>();
         stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         final List<KeyValue<Integer, String>> expected = new ArrayList<>();
         for (int key = 0; key < 32; key++) {
             final String value = "V" + key;

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index f548511..2813a08 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -24,8 +24,9 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.junit.After;
+
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -45,8 +46,8 @@ public class KStreamPrintTest {
     private final Serde<String> stringSerd = Serdes.String();
     private PrintWriter printWriter;
     private ByteArrayOutputStream byteOutStream;
-    private KStreamTestDriver driver = null;
-
+    @Rule
+    public KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
     public void setUp() {
@@ -54,13 +55,6 @@ public class KStreamPrintTest {
         printWriter = new PrintWriter(new OutputStreamWriter(byteOutStream, StandardCharsets.UTF_8));
     }
 
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-    }
-    
     @Test
     public void testPrintKeyValueWithName() {
         KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
@@ -83,7 +77,7 @@ public class KStreamPrintTest {
         final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName);
         stream.process(kStreamPrint);
         
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (KeyValue<Integer, String> record: inputRecords) {
             driver.process(topicName, record.key, record.value);
         }
@@ -116,7 +110,7 @@ public class KStreamPrintTest {
         final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName);
         stream.process(kStreamPrint);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (KeyValue<Integer, String> record: inputRecords) {
             driver.process(topicName, record.key, record.value);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 5d9f6c4..d1d785d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -38,16 +38,8 @@ public class KStreamSelectKeyTest {
 
     final private Serde<Integer> integerSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
     public void testSelectKey() {
@@ -75,7 +67,7 @@ public class KStreamSelectKeyTest {
 
         stream.selectKey(selector).process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
 
         for (int expectedValue : expectedValues) {
             driver.process(topicName, null, expectedValue);

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index c3573f3..d191891 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -36,16 +36,8 @@ public class KStreamTransformTest {
     private String topicName = "topic";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
-
-    private KStreamTestDriver driver;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
     public void testTransform() {
@@ -86,7 +78,7 @@ public class KStreamTransformTest {
         KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, topicName);
         stream.transform(transformerSupplier).process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, expectedKey * 10);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index def79ed..701a3d7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -38,16 +38,8 @@ public class KStreamTransformValuesTest {
     private String topicName = "topic";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
-
-    private KStreamTestDriver driver;
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
     public void testTransform() {
@@ -89,7 +81,7 @@ public class KStreamTransformValuesTest {
         stream = builder.stream(intSerde, intSerde, topicName);
         stream.transformValues(valueTransformerSupplier).process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver.setUp(builder);
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, expectedKey * 10);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 181e8cf..3c5a37d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -32,8 +32,8 @@ import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -44,24 +44,15 @@ import static org.junit.Assert.assertEquals;
 public class KStreamWindowAggregateTest {
 
     final private Serde<String> strSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
     private File stateDir = null;
-
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
+    
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-    @After
-    public void tearDown() throws Exception {
-        if (driver != null) {
-            driver.close();
-            driver = null;
-        }
-        Utils.delete(stateDir);
-    }
-
     @Test
     public void testAggBasic() throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -79,7 +70,7 @@ public class KStreamWindowAggregateTest {
         MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
         table2.toStream().process(proc2);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         setRecordContext(0, topic1);
         driver.process(topic1, "A", "1");
@@ -191,7 +182,7 @@ public class KStreamWindowAggregateTest {
             }
         }).toStream().process(proc3);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         setRecordContext(0, topic1);
         driver.process(topic1, "A", "1");

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index f4ad346..d7360ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -52,19 +51,12 @@ public class KTableAggregateTest {
 
     final private Serde<String> stringSerde = Serdes.String();
 
-    private KStreamTestDriver driver = null;
     private File stateDir = null;
 
     @Rule
     public EmbeddedKafkaCluster cluster = null;
-
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
     public void setUp() throws IOException {
@@ -89,7 +81,7 @@ public class KTableAggregateTest {
 
         table2.toStream().process(proc);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         driver.process(topic1, "A", "1");
         driver.flushState();
@@ -138,7 +130,7 @@ public class KTableAggregateTest {
 
         table2.toStream().process(proc);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         driver.process(topic1, "A", "1");
         driver.process(topic1, "A", "3");
@@ -180,7 +172,7 @@ public class KTableAggregateTest {
 
         table2.toStream().process(proc);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         driver.process(topic1, "A", "1");
         driver.flushState();
@@ -212,7 +204,7 @@ public class KTableAggregateTest {
     }
 
     private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier<String, Long> proc) throws IOException {
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         driver.process(input, "A", "green");
         driver.flushState();
@@ -278,7 +270,7 @@ public class KTableAggregateTest {
             .toStream()
             .process(proc);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         driver.process(input, "A", "green");
         driver.process(input, "B", "green");
@@ -331,7 +323,7 @@ public class KTableAggregateTest {
                 .toStream()
                 .process(proc);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         driver.process(input, "11", "A");
         driver.flushState();
@@ -400,7 +392,7 @@ public class KTableAggregateTest {
                     }
                 });
 
-        driver = new KStreamTestDriver(builder, stateDir, 111);
+        driver.setUp(builder, stateDir, 111);
         driver.process(reduceTopic, "1", new Change<>(1L, null));
         driver.process("tableOne", "2", "2");
         // this should trigger eviction on the reducer-store topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index a53f61e..d466295 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -26,8 +26,8 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -40,18 +40,10 @@ public class KTableFilterTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
 
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
@@ -64,7 +56,7 @@ public class KTableFilterTest {
         table2.toStream().process(proc2);
         table3.toStream().process(proc3);
 
-        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
+        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
 
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 2);
@@ -134,7 +126,7 @@ public class KTableFilterTest {
         KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
         KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
 
-        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
+        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
 
         KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
         KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
@@ -249,7 +241,7 @@ public class KTableFilterTest {
         builder.build().addProcessor("proc1", proc1, table1.name);
         builder.build().addProcessor("proc2", proc2, table2.name);
 
-        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
+        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
 
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 1);
@@ -328,7 +320,7 @@ public class KTableFilterTest {
         builder.build().addProcessor("proc1", proc1, table1.name);
         builder.build().addProcessor("proc2", proc2, table2.name);
 
-        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
+        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
 
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 1);
@@ -404,7 +396,7 @@ public class KTableFilterTest {
         builder.build().addProcessor("proc1", proc1, table1.name);
         builder.build().addProcessor("proc2", proc2, table2.name);
 
-        driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
+        driver.setUp(builder, stateDir, stringSerde, stringSerde);
 
         driver.process(topic1, "A", "reject");
         driver.process(topic1, "B", "reject");

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index b099241..d218d4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -24,8 +24,8 @@ import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -43,22 +43,14 @@ public class KTableForeachTest {
     private File stateDir = null;
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Test
     public void testForeach() {
         // Given
@@ -91,7 +83,7 @@ public class KTableForeachTest {
         table.foreach(action);
 
         // Then
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         for (KeyValue<Integer, String> record: inputRecords) {
             driver.process(topicName, record.key, record.value);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 63ea763..f791012 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -35,8 +35,8 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -51,20 +51,12 @@ import static org.junit.Assert.assertTrue;
 public class KTableImplTest {
 
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
     private StreamsBuilder builder;
     private KTable<String, String> table;
 
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
@@ -111,7 +103,7 @@ public class KTableImplTest {
         MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
         table4.toStream().process(proc4);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         driver.process(topic1, "A", "01");
         driver.flushState();
@@ -162,7 +154,7 @@ public class KTableImplTest {
         KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
         KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver.setUp(builder, stateDir, null, null);
 
         // two state store should be created
         assertEquals(2, driver.allStateStores().size());
@@ -285,7 +277,7 @@ public class KTableImplTest {
                     }
                 });
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver.setUp(builder, stateDir, null, null);
         driver.setTime(0L);
 
         // two state stores should be created
@@ -328,7 +320,7 @@ public class KTableImplTest {
                     }
                 });
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver.setUp(builder, stateDir, null, null);
         driver.setTime(0L);
 
         // two state store should be created
@@ -352,7 +344,7 @@ public class KTableImplTest {
         table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
             .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
 
-        driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
+        driver.setUp(builder, stateDir, stringSerde, stringSerde);
         driver.setTime(0L);
 
         // three state store should be created, one for source, one for aggregate and one for reduce

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 668e7f3..dd14b55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -25,8 +25,8 @@ import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -51,17 +51,9 @@ public class KTableKTableJoinTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
     private File stateDir = null;
-
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
     public void setUp() throws IOException {
@@ -92,7 +84,7 @@ public class KTableKTableJoinTest {
 
         final KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-        driver = new KStreamTestDriver(builder, stateDir, Serdes.Integer(), Serdes.String());
+        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
         final KTableValueGetter<Integer, String> getter = getterSupplier.get();
@@ -225,7 +217,7 @@ public class KTableKTableJoinTest {
                                         final boolean sendOldValues) {
 
 
-        driver = new KStreamTestDriver(builder, stateDir, Serdes.Integer(), Serdes.String());
+        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
         if (!sendOldValues) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 7913902..5abf948 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -28,8 +28,8 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -55,17 +55,9 @@ public class KTableKTableLeftJoinTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
     private File stateDir = null;
-
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
     public void setUp() throws IOException {
@@ -92,7 +84,7 @@ public class KTableKTableLeftJoinTest {
 
         KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
         KTableValueGetter<Integer, String> getter = getterSupplier.get();
@@ -184,7 +176,7 @@ public class KTableKTableLeftJoinTest {
         proc = new MockProcessorSupplier<>();
         builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
         assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
@@ -267,7 +259,7 @@ public class KTableKTableLeftJoinTest {
         proc = new MockProcessorSupplier<>();
         builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
         assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
@@ -380,7 +372,7 @@ public class KTableKTableLeftJoinTest {
                 .leftJoin(eight, MockValueJoiner.TOSTRING_JOINER)
                 .mapValues(mapper);
 
-        driver = new KStreamTestDriver(builder, stateDir, 250);
+        driver.setUp(builder, stateDir, 250);
 
         final String[] values = {"a", "AA", "BBB", "CCCC", "DD", "EEEEEEEE", "F", "GGGGGGGGGGGGGGG", "HHH", "IIIIIIIIII",
                                  "J", "KK", "LLLL", "MMMMMMMMMMMMMMMMMMMMMM", "NNNNN", "O", "P", "QQQQQ", "R", "SSSS",

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 1ea86dd..107e76a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -25,8 +25,8 @@ import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -50,17 +50,9 @@ public class KTableKTableOuterJoinTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
     private File stateDir = null;
-
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
     public void setUp() throws IOException {
@@ -91,7 +83,7 @@ public class KTableKTableOuterJoinTest {
 
         KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         KTableValueGetter<Integer, String> getter = getterSupplier.get();
         getter.init(driver.context());
@@ -190,7 +182,7 @@ public class KTableKTableOuterJoinTest {
         proc = new MockProcessorSupplier<>();
         builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
         assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
@@ -280,7 +272,7 @@ public class KTableKTableOuterJoinTest {
         proc = new MockProcessorSupplier<>();
         builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
 
         assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
         assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 9c5955a..ba27e8d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -26,8 +26,9 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -42,17 +43,10 @@ public class KTableMapKeysTest {
     final private Serde<String> stringSerde = new Serdes.StringSerde();
     final private Serde<Integer>  integerSerde = new Serdes.IntegerSerde();
     private File stateDir = null;
-    private KStreamTestDriver driver = null;
-
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
 
+    
     @Before
      public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
@@ -90,7 +84,7 @@ public class KTableMapKeysTest {
 
         convertedStream.process(processor);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         for (int i = 0;  i < originalKeys.length; i++) {
             driver.process(topic1, originalKeys[i], values[i]);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 6bdf83d..6f44112 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -26,8 +26,8 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -41,25 +41,17 @@ import static org.junit.Assert.assertTrue;
 public class KTableMapValuesTest {
 
     final private Serde<String> stringSerde = Serdes.String();
-
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
 
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> proc2) {
-        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.String());
+        driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
 
         driver.process(topic1, "A", "1");
         driver.process(topic1, "B", "2");
@@ -120,7 +112,7 @@ public class KTableMapValuesTest {
         KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
         KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
 
-        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.String());
+        driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
         KTableValueGetter<String, String> getter1 = getterSupplier1.get();
         getter1.init(driver.context());
         KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
@@ -292,7 +284,7 @@ public class KTableMapValuesTest {
 
         builder.build().addProcessor("proc", proc, table2.name);
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver.setUp(builder, stateDir);
         assertFalse(table1.sendingOldValueEnabled());
         assertFalse(table2.sendingOldValueEnabled());
 
@@ -342,7 +334,7 @@ public class KTableMapValuesTest {
 
         builder.build().addProcessor("proc", proc, table2.name);
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver.setUp(builder, stateDir);
         assertTrue(table1.sendingOldValueEnabled());
         assertTrue(table2.sendingOldValueEnabled());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 77d2b19..0c808bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -24,8 +24,8 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
@@ -39,18 +39,10 @@ public class KTableSourceTest {
 
     final private Serde<String> stringSerde = Serdes.String();
     final private Serde<Integer> intSerde = Serdes.Integer();
-
-    private KStreamTestDriver driver = null;
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
 
-    @After
-    public void tearDown() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
@@ -67,7 +59,7 @@ public class KTableSourceTest {
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setUp(builder, stateDir);
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 2);
         driver.process(topic1, "C", 3);
@@ -90,7 +82,7 @@ public class KTableSourceTest {
 
         KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver.setUp(builder, stateDir);
         KTableValueGetter<String, String> getter1 = getterSupplier1.get();
         getter1.init(driver.context());
 
@@ -136,7 +128,7 @@ public class KTableSourceTest {
 
         builder.build().addProcessor("proc1", proc1, table1.name);
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver.setUp(builder, stateDir);
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "01");
         driver.process(topic1, "C", "01");
@@ -178,7 +170,7 @@ public class KTableSourceTest {
 
         builder.build().addProcessor("proc1", proc1, table1.name);
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver.setUp(builder, stateDir);
 
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "01");


Mime
View raw message