kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3623: KStreamTestDriver extends ExternalResource
Date Tue, 01 Aug 2017 23:16:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 228a4fdb6 -> edcefccfd


http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 554310c..09200f2 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.junit.rules.ExternalResource;
 
 import java.io.File;
 import java.lang.reflect.Field;
@@ -41,38 +42,38 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class KStreamTestDriver {
+public class KStreamTestDriver extends ExternalResource {
 
     private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
 
-    private final ProcessorTopology topology;
-    private final MockProcessorContext context;
-    private final ProcessorTopology globalTopology;
+    private ProcessorTopology topology;
+    private MockProcessorContext context;
+    private ProcessorTopology globalTopology;
 
-    public KStreamTestDriver(final KStreamBuilder builder) {
-        this(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
+    public void setUp(final KStreamBuilder builder) {
+        setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
-    public KStreamTestDriver(final KStreamBuilder builder, final File stateDir) {
-        this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
+    public void setUp(final KStreamBuilder builder, final File stateDir) {
+        setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
-    public KStreamTestDriver(final KStreamBuilder builder, final File stateDir, final long
cacheSize) {
-        this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
+    public void setUp(final KStreamBuilder builder, final File stateDir, final long cacheSize)
{
+        setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
     }
 
-    public KStreamTestDriver(final KStreamBuilder builder,
-                             final File stateDir,
-                             final Serde<?> keySerde,
-                             final Serde<?> valSerde) {
-        this(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
+    public void setUp(final KStreamBuilder builder,
+                      final File stateDir,
+                      final Serde<?> keySerde,
+                      final Serde<?> valSerde) {
+        setUp(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
     }
 
-    public KStreamTestDriver(final KStreamBuilder builder,
-                             final File stateDir,
-                             final Serde<?> keySerde,
-                             final Serde<?> valSerde,
-                             final long cacheSize) {
+    public void setUp(final KStreamBuilder builder,
+                      final File stateDir,
+                      final Serde<?> keySerde,
+                      final Serde<?> valSerde,
+                      final long cacheSize) {
         builder.setApplicationId("TestDriver");
         topology = builder.build(null);
         globalTopology = builder.buildGlobalStateTopology();
@@ -87,30 +88,30 @@ public class KStreamTestDriver {
         initTopology(topology, topology.stateStores());
     }
 
-    public KStreamTestDriver(final StreamsBuilder builder) {
-        this(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
+    public void setUp(final StreamsBuilder builder) {
+        setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
-    public KStreamTestDriver(final StreamsBuilder builder, final File stateDir) {
-        this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
+    public void setUp(final StreamsBuilder builder, final File stateDir) {
+        setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
-    public KStreamTestDriver(final StreamsBuilder builder, final File stateDir, final long
cacheSize) {
-        this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
+    public void setUp(final StreamsBuilder builder, final File stateDir, final long cacheSize)
{
+        setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
     }
 
-    public KStreamTestDriver(final StreamsBuilder builder,
-                             final File stateDir,
-                             final Serde<?> keySerde,
-                             final Serde<?> valSerde) {
-        this(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
+    public void setUp(final StreamsBuilder builder,
+                      final File stateDir,
+                      final Serde<?> keySerde,
+                      final Serde<?> valSerde) {
+        setUp(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
     }
 
-    public KStreamTestDriver(final StreamsBuilder builder,
-                             final File stateDir,
-                             final Serde<?> keySerde,
-                             final Serde<?> valSerde,
-                             final long cacheSize) {
+    public void setUp(final StreamsBuilder builder,
+                      final File stateDir,
+                      final Serde<?> keySerde,
+                      final Serde<?> valSerde,
+                      final long cacheSize) {
         // TODO: we should refactor this to avoid usage of reflection
         final InternalTopologyBuilder internalTopologyBuilder;
         try {
@@ -138,6 +139,13 @@ public class KStreamTestDriver {
         }
         initTopology(topology, topology.stateStores());
     }
+    
+    @Override
+    protected void after() {
+        if (topology != null) {
+            close();
+        }
+    }
 
     private void initTopology(final ProcessorTopology topology, final List<StateStore>
stores) {
         for (final StateStore store : stores) {
@@ -217,8 +225,8 @@ public class KStreamTestDriver {
             }
         }
 
-        context.close();
         closeState();
+        context.close();
     }
 
     public Set<String> allProcessorNames() {


Mime
View raw message