kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [8/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder
Date Mon, 31 Jul 2017 22:29:11 GMT
KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3602 from mjsax/kafka-5671-add-streamsbuilder


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da220557
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da220557
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da220557

Branch: refs/heads/trunk
Commit: da2205578be3228ce40eb5e59f6bbcb0c8da0aba
Parents: 1844bf2
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Mon Jul 31 15:28:59 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jul 31 15:28:59 2017 -0700

----------------------------------------------------------------------
 docs/streams/developer-guide.html               |   18 +-
 docs/streams/index.html                         |    6 +-
 .../examples/pageview/PageViewTypedDemo.java    |    8 +-
 .../examples/pageview/PageViewUntypedDemo.java  |    8 +-
 .../kafka/streams/examples/pipe/PipeDemo.java   |    6 +-
 .../examples/wordcount/WordCountDemo.java       |    6 +-
 .../kafka/streams/KafkaClientSupplier.java      |    2 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |   24 +-
 .../apache/kafka/streams/StreamsBuilder.java    | 1211 ++++++++++++++++++
 .../org/apache/kafka/streams/StreamsConfig.java |    2 +-
 .../java/org/apache/kafka/streams/Topology.java |    6 +
 .../kafka/streams/kstream/GlobalKTable.java     |    5 +-
 .../apache/kafka/streams/kstream/KStream.java   |   18 +-
 .../kafka/streams/kstream/KStreamBuilder.java   |  209 +--
 .../apache/kafka/streams/kstream/KTable.java    |   54 +-
 .../kstream/internals/AbstractStream.java       |   15 +-
 .../internals/InternalStreamsBuilder.java       |  203 +++
 .../kstream/internals/KGroupedStreamImpl.java   |   30 +-
 .../kstream/internals/KGroupedTableImpl.java    |   25 +-
 .../streams/kstream/internals/KStreamImpl.java  |  461 +++----
 .../kstream/internals/KStreamJoinWindow.java    |    6 +-
 .../streams/kstream/internals/KTableImpl.java   |  188 +--
 .../streams/processor/PartitionGrouper.java     |    2 +-
 .../streams/processor/TopologyBuilder.java      |   63 +-
 .../internals/StreamPartitionAssignor.java      |    7 +-
 .../apache/kafka/streams/KafkaStreamsTest.java  |   57 +-
 .../kafka/streams/StreamsBuilderTest.java       |  123 ++
 .../org/apache/kafka/streams/TopologyTest.java  |    1 +
 .../streams/integration/EosIntegrationTest.java |   14 +-
 .../integration/FanoutIntegrationTest.java      |    6 +-
 .../GlobalKTableIntegrationTest.java            |    8 +-
 .../InternalTopicIntegrationTest.java           |   14 +-
 .../integration/JoinIntegrationTest.java        |    8 +-
 .../KStreamAggregationDedupIntegrationTest.java |   13 +-
 .../KStreamAggregationIntegrationTest.java      |    8 +-
 .../KStreamKTableJoinIntegrationTest.java       |    6 +-
 .../integration/KStreamRepartitionJoinTest.java |    9 +-
 ...eamsFineGrainedAutoResetIntegrationTest.java |   25 +-
 .../KTableKTableJoinIntegrationTest.java        |    6 +-
 .../QueryableStateIntegrationTest.java          |   32 +-
 .../integration/RegexSourceIntegrationTest.java |   37 +-
 .../integration/ResetIntegrationTest.java       |   15 +-
 .../streams/kstream/KStreamBuilderTest.java     |    9 +-
 .../kstream/internals/AbstractStreamTest.java   |   16 +-
 .../internals/GlobalKTableJoinsTest.java        |    4 +-
 .../internals/InternalStreamsBuilderTest.java   |  370 ++++++
 .../internals/KGroupedStreamImplTest.java       |   24 +-
 .../internals/KGroupedTableImplTest.java        |    4 +-
 .../kstream/internals/KStreamBranchTest.java    |    7 +-
 .../kstream/internals/KStreamFilterTest.java    |    8 +-
 .../kstream/internals/KStreamFlatMapTest.java   |   10 +-
 .../internals/KStreamFlatMapValuesTest.java     |    4 +-
 .../kstream/internals/KStreamForeachTest.java   |   13 +-
 .../kstream/internals/KStreamImplTest.java      |   29 +-
 .../internals/KStreamKStreamJoinTest.java       |   22 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |   10 +-
 .../internals/KStreamKTableJoinTest.java        |    6 +-
 .../internals/KStreamKTableLeftJoinTest.java    |    6 +-
 .../kstream/internals/KStreamMapTest.java       |    6 +-
 .../kstream/internals/KStreamMapValuesTest.java |    4 +-
 .../kstream/internals/KStreamPeekTest.java      |    7 +-
 .../kstream/internals/KStreamPrintTest.java     |   13 +-
 .../kstream/internals/KStreamSelectKeyTest.java |    6 +-
 .../kstream/internals/KStreamTransformTest.java |    6 +-
 .../internals/KStreamTransformValuesTest.java   |    4 +-
 .../internals/KStreamWindowAggregateTest.java   |    6 +-
 .../kstream/internals/KTableAggregateTest.java  |   21 +-
 .../kstream/internals/KTableFilterTest.java     |   46 +-
 .../kstream/internals/KTableForeachTest.java    |   18 +-
 .../kstream/internals/KTableImplTest.java       |   50 +-
 .../kstream/internals/KTableKTableJoinTest.java |   38 +-
 .../internals/KTableKTableLeftJoinTest.java     |   16 +-
 .../internals/KTableKTableOuterJoinTest.java    |   14 +-
 .../kstream/internals/KTableMapKeysTest.java    |    5 +-
 .../kstream/internals/KTableMapValuesTest.java  |   22 +-
 .../kstream/internals/KTableSourceTest.java     |   14 +-
 .../kafka/streams/perf/SimpleBenchmark.java     |   26 +-
 .../kafka/streams/perf/YahooBenchmark.java      |   10 +-
 .../internals/GlobalStreamThreadTest.java       |    2 +-
 .../processor/internals/StandbyTaskTest.java    |   15 +-
 .../internals/StreamPartitionAssignorTest.java  |   28 +-
 .../processor/internals/StreamThreadTest.java   |  185 +--
 .../internals/StreamsMetadataStateTest.java     |   19 +-
 .../streams/tests/BrokerCompatibilityTest.java  |    6 +-
 .../kafka/streams/tests/EosTestClient.java      |    6 +-
 .../streams/tests/ShutdownDeadlockTest.java     |    6 +-
 .../kafka/streams/tests/SmokeTestClient.java    |    6 +-
 .../apache/kafka/test/KStreamTestDriver.java    |   56 +
 .../kafka/test/ProcessorTopologyTestDriver.java |    8 +-
 89 files changed, 3119 insertions(+), 1057 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 76c2dc7..f24b17f 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -294,7 +294,7 @@
     </p>
 
     <pre class="brush: java;">
-    KStreamBuilder builder = new KStreamBuilder();
+    StreamsBuilder builder = new StreamsBuilder();
 
     KStream&lt;String, GenericRecord&gt; source1 = builder.stream("topic1", "topic2");
     KTable&lt;String, GenericRecord&gt; source2 = builder.table("topic3", "stateStoreName");
@@ -571,7 +571,7 @@
 
     <pre class="brush: java;">
           StreamsConfig config = ...;
-          KStreamBuilder builder = ...;
+          StreamsBuilder builder = ...;
           KStream&lt;String, String&gt; textLines = ...;
 
           // Define the processing topology (here: WordCount)
@@ -583,7 +583,7 @@
           groupedByWord.count("CountsKeyValueStore");
 
           // Start an instance of the topology
-          KafkaStreams streams = new KafkaStreams(builder, config);
+          KafkaStreams streams = new KafkaStreams(builder.build(), config);
           streams.start();
         </pre>
 
@@ -627,7 +627,7 @@
 
     <pre class="brush: java;">
           StreamsConfig config = ...;
-          KStreamBuilder builder = ...;
+          StreamsBuilder builder = ...;
           KStream&lt;String, String&gt; textLines = ...;
 
           // Define the processing topology (here: WordCount)
@@ -850,7 +850,7 @@
           // ... further settings may follow here ...
 
           StreamsConfig config = new StreamsConfig(props);
-          KStreamBuilder builder = new KStreamBuilder();
+          StreamsBuilder builder = new StreamsBuilder();
 
           KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, stringSerde,
"word-count-input");
 
@@ -863,7 +863,7 @@
           groupedByWord.count("word-count");
 
           // Start an instance of the topology
-          KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+          KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
           streams.start();
 
           // Then, create and start the actual RPC service for remote access to this
@@ -1082,8 +1082,8 @@
 
     <pre class="brush: java;">
     import org.apache.kafka.streams.KafkaStreams;
+    import org.apache.kafka.streams.StreamsBuilder;
     import org.apache.kafka.streams.StreamsConfig;
-    import org.apache.kafka.streams.kstream.KStreamBuilder;
     import org.apache.kafka.streams.Topology;
 
     // Use the builders to define the actual processing topology, e.g. to specify
@@ -1094,8 +1094,8 @@
     //
     // OR
     //
-    KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
-    Topology topology = builder.topology();
+    StreamsBuilder builder = ...;  // when using the Kafka Streams DSL
+    Topology topology = builder.build();
 
     // Use the configuration to tell your application where the Kafka cluster is,
     // which serializers/deserializers to use by default, to specify security settings,

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/docs/streams/index.html
----------------------------------------------------------------------
diff --git a/docs/streams/index.html b/docs/streams/index.html
index bcaa831..5a6da8e 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -75,9 +75,9 @@
             <pre class="brush: java;">
                 import org.apache.kafka.common.serialization.Serdes;
                 import org.apache.kafka.streams.KafkaStreams;
+                import org.apache.kafka.streams.StreamsBuilder;
                 import org.apache.kafka.streams.StreamsConfig;
                 import org.apache.kafka.streams.kstream.KStream;
-                import org.apache.kafka.streams.kstream.KStreamBuilder;
                 import org.apache.kafka.streams.kstream.KTable;
 
                 import java.util.Arrays;
@@ -100,7 +100,7 @@
                             .count("Counts");
                         wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
 
-                        KafkaStreams streams = new KafkaStreams(builder, config);
+                        KafkaStreams streams = new KafkaStreams(builder.build(), config);
                         streams.start();
                     }
 
@@ -112,9 +112,9 @@
             <pre class="brush: java;">
                 import org.apache.kafka.common.serialization.Serdes;
                 import org.apache.kafka.streams.KafkaStreams;
+                import org.apache.kafka.streams.StreamsBuilder;
                 import org.apache.kafka.streams.StreamsConfig;
                 import org.apache.kafka.streams.kstream.KStream;
-                import org.apache.kafka.streams.kstream.KStreamBuilder;
                 import org.apache.kafka.streams.kstream.KTable;
                 import org.apache.kafka.streams.kstream.KeyValueMapper;
                 import org.apache.kafka.streams.kstream.ValueMapper;

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 74afa8d..0783c65 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -23,14 +23,14 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -89,7 +89,7 @@ public class PageViewTypedDemo {
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         // TODO: the following can be removed with a serialization factory
         Map<String, Object> serdeProps = new HashMap<>();
@@ -190,7 +190,7 @@ public class PageViewTypedDemo {
         // write to the result topic
         regionCount.to(wPageViewByRegionSerde, regionCountSerde, "streams-pageviewstats-typed-output");
 
-        KafkaStreams streams = new KafkaStreams(builder, props);
+        KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();
 
         // usually the stream application would be running forever,

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 1a90f68..e930985 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -24,12 +24,12 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.connect.json.JsonSerializer;
 import org.apache.kafka.connect.json.JsonDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -65,7 +65,7 @@ public class PageViewUntypedDemo {
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
         final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
@@ -121,7 +121,7 @@ public class PageViewUntypedDemo {
         // write to the result topic
         regionCount.to(jsonSerde, jsonSerde, "streams-pageviewstats-untyped-output");
 
-        KafkaStreams streams = new KafkaStreams(builder, props);
+        KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();
 
         // usually the stream application would be running forever,

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index 1d672b2..0831e3b 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.examples.pipe;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.Properties;
@@ -49,11 +49,11 @@ public class PipeDemo {
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         builder.stream("streams-file-input").to("streams-pipe-output");
 
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);
 
         // attach shutdown handler to catch control-c

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 616fc48..e3cf60c 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -60,7 +60,7 @@ public class WordCountDemo {
         // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         KStream<String, String> source = builder.stream("streams-wordcount-input");
 
@@ -82,7 +82,7 @@ public class WordCountDemo {
         // need to override value serde to Long type
         counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
 
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);
 
         // attach shutdown handler to catch control-c

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index f733c25..260d58a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -26,7 +26,7 @@ import java.util.Map;
 /**
  * {@code KafkaClientSupplier} can be used to provide custom Kafka clients to a {@link KafkaStreams}
instance.
  *
- * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.processor.TopologyBuilder, StreamsConfig,
KafkaClientSupplier)
+ * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig, KafkaClientSupplier)
  */
 public interface KafkaClientSupplier {
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d7c608a..ec09730 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -33,13 +33,11 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@@ -89,8 +87,9 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG
  * A Kafka client that allows for performing continuous computation on input coming from
one or more input topics and
  * sends output to zero, one, or more output topics.
  * <p>
- * The computational logic can be specified either by using the {@link TopologyBuilder} to
define a DAG topology of
- * {@link Processor}s or by using the {@link KStreamBuilder} which provides the high-level
DSL to define transformations.
+ * The computational logic can be specified either by using the {@link Topology} to define
a DAG topology of
+ * {@link Processor}s or by using the {@link StreamsBuilder} which provides the high-level
DSL to define
+ * transformations.
  * <p>
  * One {@code KafkaStreams} instance can contain one or more threads specified in the configs
for the processing work.
  * <p>
@@ -121,8 +120,8 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG
  * streams.start();
  * }</pre>
  *
- * @see KStreamBuilder
- * @see Topology
+ * @see org.apache.kafka.streams.StreamsBuilder
+ * @see org.apache.kafka.streams.Topology
  */
 @InterfaceStability.Evolving
 public class KafkaStreams {
@@ -145,7 +144,6 @@ public class KafkaStreams {
     private final String logPrefix;
     private final StreamsMetadataState streamsMetadataState;
     private final StreamsConfig config;
-    private StateRestoreListener stateRestoreListener;
     private final StateDirectory stateDirectory;
 
     // container states
@@ -406,7 +404,7 @@ public class KafkaStreams {
      * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
      */
     @Deprecated
-    public KafkaStreams(final TopologyBuilder builder,
+    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
                         final Properties props) {
         this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
     }
@@ -415,7 +413,7 @@ public class KafkaStreams {
      * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig)} instead
      */
     @Deprecated
-    public KafkaStreams(final TopologyBuilder builder,
+    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
                         final StreamsConfig config) {
         this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
     }
@@ -424,7 +422,7 @@ public class KafkaStreams {
      * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig, KafkaClientSupplier)}
instead
      */
     @Deprecated
-    public KafkaStreams(final TopologyBuilder builder,
+    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier) {
         this(builder.internalTopologyBuilder, config, clientSupplier);
@@ -739,7 +737,7 @@ public class KafkaStreams {
     /**
      * Produce a string representation containing useful information about this {@code KafkaStream}
instance such as
      * thread IDs, task IDs, and a representation of the topology DAG including {@link StateStore}s
(cf.
-     * {@link TopologyBuilder} and {@link KStreamBuilder}).
+     * {@link Topology} and {@link StreamsBuilder}).
      *
      * @return A string representation of the Kafka Streams instance.
      */
@@ -751,7 +749,7 @@ public class KafkaStreams {
     /**
      * Produce a string representation containing useful information about this {@code KafkaStream}
instance such as
      * thread IDs, task IDs, and a representation of the topology DAG including {@link StateStore}s
(cf.
-     * {@link TopologyBuilder} and {@link KStreamBuilder}).
+     * {@link Topology} and {@link StreamsBuilder}).
      *
      * @param indent the top-level indent for each line
      * @return A string representation of the Kafka Streams instance.
@@ -882,7 +880,7 @@ public class KafkaStreams {
      * If a {@link StreamPartitioner custom partitioner} has been
      * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig},
      * {@link KStream#through(StreamPartitioner, String)}, or {@link KTable#through(StreamPartitioner,
String, String)},
-     * or if the original {@link KTable}'s input {@link KStreamBuilder#table(String, String)
topic} is partitioned
+     * or if the original {@link KTable}'s input {@link StreamsBuilder#table(String, String)
topic} is partitioned
      * differently, please use {@link #metadataForKey(String, Object, StreamPartitioner)}.
      * <p>
      * Note:


Mime
View raw message