kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder
Date Mon, 31 Jul 2017 22:29:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1844bf2b2 -> da2205578


http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 7550109..751fc97 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -18,12 +18,12 @@ package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 
 import java.io.File;
 import java.util.Properties;
@@ -99,7 +99,7 @@ public class EosTestClient extends SmokeTestUtil {
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
 
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, Integer> data = builder.stream("data");
 
         data.to("echo");
@@ -179,7 +179,7 @@ public class EosTestClient extends SmokeTestUtil {
                 .to(stringSerde, longSerde, "cnt");
         }
 
-        return new KafkaStreams(builder, props);
+        return new KafkaStreams(builder.build(), props);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
index 661dcba..614ab4d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
@@ -23,10 +23,10 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -44,7 +44,7 @@ public class ShutdownDeadlockTest {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "shouldNotDeadlock");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> source = builder.stream(Serdes.String(), Serdes.String(),
topic);
 
         source.foreach(new ForeachAction<String, String>() {
@@ -53,7 +53,7 @@ public class ShutdownDeadlockTest {
                 throw new RuntimeException("KABOOM!");
             }
         });
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 263474c..c3820b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -19,12 +19,12 @@ package org.apache.kafka.streams.tests;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.TimeWindows;
@@ -104,7 +104,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         props.put(ProducerConfig.ACKS_CONFIG, "all");
 
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
         KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");
         source.to(stringSerde, intSerde, "echo");
         KStream<String, Integer> data = source.filter(new Predicate<String, Integer>()
{
@@ -227,7 +227,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                     "cntByCnt"
         ).to(stringSerde, longSerde, "tagg");
 
-        final KafkaStreams streamsClient = new KafkaStreams(builder, props);
+        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
         streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 9fb83ba..554310c 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -20,10 +20,13 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -32,6 +35,7 @@ import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -83,6 +87,58 @@ public class KStreamTestDriver {
         initTopology(topology, topology.stateStores());
     }
 
+    public KStreamTestDriver(final StreamsBuilder builder) {
+        this(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
+    }
+
+    public KStreamTestDriver(final StreamsBuilder builder, final File stateDir) {
+        this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
+    }
+
+    public KStreamTestDriver(final StreamsBuilder builder, final File stateDir, final long
cacheSize) {
+        this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
+    }
+
+    public KStreamTestDriver(final StreamsBuilder builder,
+                             final File stateDir,
+                             final Serde<?> keySerde,
+                             final Serde<?> valSerde) {
+        this(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
+    }
+
+    public KStreamTestDriver(final StreamsBuilder builder,
+                             final File stateDir,
+                             final Serde<?> keySerde,
+                             final Serde<?> valSerde,
+                             final long cacheSize) {
+        // TODO: we should refactor this to avoid usage of reflection
+        final InternalTopologyBuilder internalTopologyBuilder;
+        try {
+            final Field internalStreamsBuilderField = builder.getClass().getDeclaredField("internalStreamsBuilder");
+            internalStreamsBuilderField.setAccessible(true);
+            final InternalStreamsBuilder internalStreamsBuilder = (InternalStreamsBuilder)
internalStreamsBuilderField.get(builder);
+
+            final Field internalTopologyBuilderField = internalStreamsBuilder.getClass().getDeclaredField("internalTopologyBuilder");
+            internalTopologyBuilderField.setAccessible(true);
+            internalTopologyBuilder = (InternalTopologyBuilder) internalTopologyBuilderField.get(internalStreamsBuilder);
+        } catch (final NoSuchFieldException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+
+        internalTopologyBuilder.setApplicationId("TestDriver");
+        topology = internalTopologyBuilder.build(null);
+        globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        final ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new
Metrics()));
+        context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(),
cache);
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
+        // init global topology first as it will add stores to the
+        // store map that are required for joins etc.
+        if (globalTopology != null) {
+            initTopology(globalTopology, globalTopology.globalStateStores());
+        }
+        initTopology(topology, topology.stateStores());
+    }
+
     private void initTopology(final ProcessorTopology topology, final List<StateStore>
stores) {
         for (final StateStore store : stores) {
             store.init(context, store);

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 55f8728..4c21e54 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -33,11 +33,11 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
 import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
@@ -67,9 +67,9 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * This class makes it easier to write tests to verify the behavior of topologies created
with a {@link TopologyBuilder}.
+ * This class makes it easier to write tests to verify the behavior of topologies created
with a {@link Topology}.
  * You can test simple topologies that have a single processor, or very complex topologies
that have multiple sources, processors,
- * and sinks. And because it starts with a {@link TopologyBuilder}, you can create topologies
specific to your tests or you
+ * and sinks. And because it starts with a {@link Topology}, you can create topologies specific
to your tests or you
  * can use and test code you already have that uses a builder to create topologies. Best
of all, the class works without a real
  * Kafka broker, so the tests execute very quickly with very little overhead.
  * <p>


Mime
View raw message