kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: update streams quickstart for KIP-182
Date Wed, 04 Oct 2017 19:19:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ac7695c32 -> 93b71e7de


MINOR: update streams quickstart for KIP-182

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Michael G. Noll <michael@confluent.io>, Bill Bejeck <bill@confluent.io>,
Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3984 from dguy/quickstart-update


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/93b71e7d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/93b71e7d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/93b71e7d

Branch: refs/heads/trunk
Commit: 93b71e7deeb9d5c705d654443147f90e45cb11c8
Parents: ac7695c
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Oct 4 12:19:40 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Oct 4 12:19:40 2017 -0700

----------------------------------------------------------------------
 docs/streams/quickstart.html                    |  7 ++++---
 docs/streams/tutorial.html                      | 20 ++++++++++++--------
 .../src/main/java/WordCount.java                | 18 ++++++++++++------
 3 files changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/93b71e7d/docs/streams/quickstart.html
----------------------------------------------------------------------
diff --git a/docs/streams/quickstart.html b/docs/streams/quickstart.html
index ea59194..72fd952 100644
--- a/docs/streams/quickstart.html
+++ b/docs/streams/quickstart.html
@@ -44,7 +44,8 @@ final Serde&lt;Long&gt; longSerde = Serdes.Long();
 // Construct a `KStream` from the input topic "streams-plaintext-input", where message values
 // represent lines of text (for the sake of this example, we ignore whatever may be stored
 // in the message keys).
-KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, stringSerde,
"streams-plaintext-input");
+KStream&lt;String, String&gt; textLines = builder.stream("streams-plaintext-input",
+    Consumed.with(stringSerde, stringSerde);
 
 KTable&lt;String, Long&gt; wordCounts = textLines
     // Split each text line, by whitespace, into words.
@@ -54,10 +55,10 @@ KTable&lt;String, Long&gt; wordCounts = textLines
     .groupBy((key, value) -> value)
 
     // Count the occurrences of each word (message key).
-    .count("Counts")
+    .count()
 
 // Store the running counts as a changelog stream to the output topic.
-wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");
+wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
 </pre>
 
 <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/93b71e7d/docs/streams/tutorial.html
----------------------------------------------------------------------
diff --git a/docs/streams/tutorial.html b/docs/streams/tutorial.html
index 6476d07..f800681 100644
--- a/docs/streams/tutorial.html
+++ b/docs/streams/tutorial.html
@@ -485,12 +485,14 @@
                        return value;
                    }
                 })
-              .count("Counts");
+              // Materialize the result into a KeyValueStore named "counts-store".
+              // The Materialized store is always of type &lt;Bytes, byte[]&gt; as
this is the format of the inner most store.
+              .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;
as("counts-store"));
     </pre>
 
     <p>
-        Note that the <code>count</code> operator has a <code>String</code>
typed parameter <code>Counts</code>,
-        which stores the running counts that keep being updated as more records are piped
and processed from the source Kafka topic.
+        Note that the <code>count</code> operator has a <code>Materialized</code>
parameter that specifies that the
+        running count should be stored in a state store named <code>counts-store</code>.
         This <code>Counts</code> store can be queried in real-time, with details
described in the <a href="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer
Manual</a>.
     </p>
 
@@ -502,7 +504,7 @@
     </p>
 
     <pre class="brush: java;">
-        counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
     </pre>
 
     <p>
@@ -516,8 +518,9 @@
         KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
         source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
               .groupBy((key, value) -> value)
-              .count("Counts")
-              .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+              .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"))
+              .toStream()
+              .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
     </pre>
 
     <p>
@@ -589,8 +592,9 @@
                 KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
                 source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                       .groupBy((key, value) -> value)
-                      .count("Counts")
-                      .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+                      .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes,
byte[]&gt;&gt;as("counts-store"))
+                      .toStream()
+                      .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
 
                 final Topology topology = builder.build();
                 final KafkaStreams streams = new KafkaStreams(topology, props);

http://git-wip-us.apache.org/repos/asf/kafka/blob/93b71e7d/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
index 6dafa8c..020eb03 100644
--- a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -17,12 +17,16 @@
 package ${package};
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Arrays;
 import java.util.Locale;
@@ -59,17 +63,19 @@ public class WordCount {
                        return value;
                    }
                 })
-               .count("Counts")
-               .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
+               .toStream()
+               .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
 
 
-        /* ------- use the code below for Java 8 and uncomment the above ----
+        /* ------- use the code below for Java 8 and comment the above ----
 
-        builder.stream("streams-plaintext-input")
+        builder.<String, String>stream("streams-plaintext-input")
                .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -> value)
-               .count("Counts")
-               .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
+               .toStream()
+               .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
 
            ----------------------------------------------------------------- */
 


Mime
View raw message