kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Make streams quick start more interactive
Date Tue, 25 Jul 2017 18:34:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5d798511b -> 91c207c2c


MINOR: Make streams quick start more interactive

1. Make the WordCountDemo application to not stop automatically but via "ctrl-C".
2. Update the quickstart html file to let users type input messages one-by-one, and observe
added output in an interactive manner.
3. Some minor fixes on the parent documentation page pointing to streams sub-pages, added
a new recommended Scala version number.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Michael G. Noll <michael@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #3515 from guozhangwang/KMinor-interactive-quickstart


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/91c207c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/91c207c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/91c207c2

Branch: refs/heads/trunk
Commit: 91c207c2c6b09d88cc3366d69a31d0bf0ab0bffb
Parents: 5d79851
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue Jul 25 11:34:16 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 25 11:34:16 2017 -0700

----------------------------------------------------------------------
 docs/js/templateData.js                         |   1 +
 docs/streams/quickstart.html                    | 192 ++++++++++++++-----
 docs/toc.html                                   |   9 +
 .../kafka/streams/examples/pipe/PipeDemo.java   |  25 ++-
 .../examples/wordcount/WordCountDemo.java       |  27 ++-
 .../wordcount/WordCountProcessorDemo.java       |  31 ++-
 6 files changed, 214 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/docs/js/templateData.js
----------------------------------------------------------------------
diff --git a/docs/js/templateData.js b/docs/js/templateData.js
index 3eca71e..50997bd 100644
--- a/docs/js/templateData.js
+++ b/docs/js/templateData.js
@@ -20,4 +20,5 @@ var context={
     "version": "0110",
     "dotVersion": "0.11.0",
     "fullDotVersion": "0.11.0.0"
+    "scalaVersion:" "2.11"
 };

http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/docs/streams/quickstart.html
----------------------------------------------------------------------
diff --git a/docs/streams/quickstart.html b/docs/streams/quickstart.html
index 1c45e16..031a375 100644
--- a/docs/streams/quickstart.html
+++ b/docs/streams/quickstart.html
@@ -40,10 +40,10 @@ of the <code><a href="https://github.com/apache/kafka/blob/{{dotVersion}}/stream
 final Serde&lt;String&gt; stringSerde = Serdes.String();
 final Serde&lt;Long&gt; longSerde = Serdes.Long();
 
-// Construct a `KStream` from the input topic ""streams-file-input", where message values
+// Construct a `KStream` from the input topic "streams-wordcount-input", where message values
 // represent lines of text (for the sake of this example, we ignore whatever may be stored
 // in the message keys).
-KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, stringSerde,
"streams-file-input");
+KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, stringSerde,
"streams-wordcount-input");
 
 KTable&lt;String, Long&gt; wordCounts = textLines
     // Split each text line, by whitespace, into words.
@@ -71,16 +71,18 @@ because it cannot know when it has processed "all" the input data.
 <p>
   As the first step, we will start Kafka (unless you already have it started) and then we
will
   prepare input data to a Kafka topic, which will subsequently be processed by a Kafka Streams
application.
+</p>
 
-  <h4><a id="quickstart_streams_download" href="#quickstart_streams_download">Step
1: Download the code</a></h4>
+<h4><a id="quickstart_streams_download" href="#quickstart_streams_download">Step
1: Download the code</a></h4>
 
-<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_2.11-{{fullDotVersion}}.tgz"
title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
+<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz"
title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
+Note that there are multiple downloadable Scala versions and we choose to use the recommended
version ({{scalaVersion}}) here:
 
 <pre class="brush: bash;">
-&gt; tar -xzf kafka_2.11-{{fullDotVersion}}.tgz
-&gt; cd kafka_2.11-{{fullDotVersion}}
+&gt; tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
+&gt; cd kafka_{{scalaVersion}}-{{fullDotVersion}}
 </pre>
-</p>
+
 <h4><a id="quickstart_streams_startserver" href="#quickstart_streams_startserver">Step
2: Start the Kafka server</a></h4>
 
 <p>
@@ -102,19 +104,9 @@ Kafka uses <a href="https://zookeeper.apache.org/">ZooKeeper</a>
so you need to
 </pre>
 
 
-<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step
3: Prepare data</a></h4>
+<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step
3: Prepare input topic and start Kafka producer</a></h4>
 
 <!--
-<pre>
-&gt; <b>./bin/kafka-topics --create \</b>
-            <b>--zookeeper localhost:2181 \</b>
-            <b>--replication-factor 1 \</b>
-            <b>--partitions 1 \</b>
-            <b>--topic streams-file-input</b>
-
-</pre>
-
--->
 
 <pre class="brush: bash;">
 &gt; echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" >
file-input.txt
@@ -126,41 +118,59 @@ Or on Windows:
 &gt; echo|set /p=join kafka summit>> file-input.txt
 </pre>
 
-<p>
-Next, we send this input data to the input topic named <b>streams-file-input</b>
using the console producer,
-which reads the data from STDIN line-by-line, and publishes each line as a separate Kafka
message with null key and value encoded a string to the topic (in practice,
-stream data will likely be flowing continuously into Kafka where the application will be
up and running):
-</p>
+-->
+
+Next, we create the input topic named <b>streams-wordcount-input</b> and the
output topic named <b>streams-wordcount-output</b>:
 
 <pre class="brush: bash;">
 &gt; bin/kafka-topics.sh --create \
     --zookeeper localhost:2181 \
     --replication-factor 1 \
     --partitions 1 \
-    --topic streams-file-input
+    --topic streams-wordcount-input
+Created topic "streams-wordcount-input".
+
+&gt; bin/kafka-topics.sh --create \
+    --zookeeper localhost:2181 \
+    --replication-factor 1 \
+    --partitions 1 \
+    --topic streams-wordcount-output
+Created topic "streams-wordcount-output".
 </pre>
 
+The created topic can be described with the same <b>kafka-topics</b> tool:
 
 <pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input
< file-input.txt
+&gt; bin/kafka-topics.sh --zookeeper localhost:2181 --describe
+
+Topic:streams-wordcount-input	PartitionCount:1	ReplicationFactor:1	Configs:
+    Topic: streams-wordcount-input	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
+Topic:streams-wordcount-output	PartitionCount:1	ReplicationFactor:1	Configs:
+	Topic: streams-wordcount-output	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
 </pre>
 
-<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step
4: Process data</a></h4>
+<h4><a id="quickstart_streams_start" href="#quickstart_streams_start">Step 4:
Start the Wordcount Application</a></h4>
+
+The following command starts the WordCount demo application:
 
 <pre class="brush: bash;">
 &gt; bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
 </pre>
 
 <p>
-The demo application will read from the input topic <b>streams-file-input</b>,
perform the computations of the WordCount algorithm on each of the read messages,
+The demo application will read from the input topic <b>streams-wordcount-input</b>,
perform the computations of the WordCount algorithm on each of the read messages,
 and continuously write its current results to the output topic <b>streams-wordcount-output</b>.
 Hence there won't be any STDOUT output except log entries as the results are written back
into in Kafka.
-The demo will run for a few seconds and then, unlike typical stream processing applications,
terminate automatically.
-</p>
-<p>
-We can now inspect the output of the WordCount demo application by reading from its output
topic:
 </p>
 
+Now we can start the console producer in a separate terminal to write some input data to
this topic:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+</pre>
+
+and inspect the output of the WordCount demo application by reading from its output topic
with the console consumer in a separate terminal:
+
 <pre class="brush: bash;">
 &gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
     --topic streams-wordcount-output \
@@ -172,27 +182,115 @@ We can now inspect the output of the WordCount demo application by
reading from
     --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 </pre>
 
+
+<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step
5: Process some data</a></h4>
+
+Now let's write some message with the console producer into the input topic <b>streams-wordcount-input</b>
by entering a single line of text and then hit &lt;RETURN&gt;.
+This will send a new message to the input topic, where the message key is null and the message
value is the string encoded text line that you just entered
+(in practice, input data for applications will typically be streaming continuously into Kafka,
rather than being manually entered as we do in this quickstart):
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+all streams lead to kafka
+</pre>
+
 <p>
-with the following output data being printed to the console:
+This message will be processed by the Wordcount application and the following output data
will be written to the <b>streams-wordcount-output</b> topic and printed by the
console consumer:
 </p>
 
 <pre class="brush: bash;">
-all     1
-lead    1
-to      1
-hello   1
-streams 2
-join    1
-kafka   3
-summit  1
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
\
+    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all	    1
+streams	1
+lead	1
+to	    1
+kafka	1
 </pre>
 
 <p>
-Here, the first column is the Kafka message key in <code>java.lang.String</code>
format, and the second column is the message value in <code>java.lang.Long</code>
format.
-Note that the output is actually a continuous stream of updates, where each data record (i.e.
each line in the original output above) is
-an updated count of a single word, aka record key such as "kafka". For multiple records with
the same key, each later record is an update of the previous one.
+Here, the first column is the Kafka message key in <code>java.lang.String</code>
format and represents a word that is being counted, and the second column is the message value
in <code>java.lang.Long</code>format, representing the word's latest count.
 </p>
 
+Now let's continue writing one more message with the console producer into the input topic
<b>streams-wordcount-input</b>.
+Enter the text line "hello kafka streams" and hit &lt;RETURN&gt;.
+Your terminal should look as follows:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+all streams lead to kafka
+hello kafka streams
+</pre>
+
+In your other terminal in which the console consumer is running, you will observe that the
WordCount application wrote new output data:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
\
+    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all	    1
+streams	1
+lead	1
+to	    1
+kafka	1
+hello	1
+kafka	2
+streams	2
+</pre>
+
+Here the last printed lines <b>kafka 2</b> and <b>streams 2</b> indicate
updates to the keys <b>kafka</b> and <b>streams</b> whose counts have
been incremented from <b>1</b> to <b>2</b>.
+Whenever you write further input messages to the input topic, you will observe new messages
being added to the <b>streams-wordcount-output</b> topic,
+representing the most recent word counts as computed by the WordCount application.
+Let's enter one final input text line "join kafka summit" and hit &lt;RETURN&gt;
in the console producer to the input topic <b>streams-wordcount-input</b> before
we wrap up this quickstart:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+all streams lead to kafka
+hello kafka streams
+join kafka summit
+</pre>
+
+The <b>streams-wordcount-output</b> topic will subsequently show the corresponding
updated word counts (see last three lines):
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
\
+    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all	    1
+streams	1
+lead	1
+to	    1
+kafka	1
+hello	1
+kafka	2
+streams	2
+join	1
+kafka	3
+summit	1
+</pre>
+
+As one can see, outputs of the Wordcount application is actually a continuous stream of updates,
where each output record (i.e. each line in the original output above) is
+an updated count of a single word, aka record key such as "kafka". For multiple records with
the same key, each later record is an update of the previous one.
+
 <p>
 The two diagrams below illustrate what is essentially happening behind the scenes.
 The first column shows the evolution of the current state of the <code>KTable&lt;String,
Long&gt;</code> that is counting word occurrences for <code>count</code>.
@@ -217,13 +315,9 @@ And so on (we skip the illustration of how the third line is being processed).
T
 Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to
leverage the duality between a table and a changelog stream (here: table = the KTable, changelog
stream = the downstream KStream): you can publish every change of the table to a stream, and
if you consume the entire changelog stream from beginning to end, you can reconstruct the
contents of the table.
 </p>
 
-<p>
-Now you can write more input messages to the <b>streams-file-input</b> topic
and observe additional messages added
-to <b>streams-wordcount-output</b> topic, reflecting updated word counts (e.g.,
using the console producer and the
-console consumer, as described above).
-</p>
+<h4><a id="quickstart_streams_stop" href="#quickstart_streams_stop">Step 6: Teardown
the application</a></h4>
 
-<p>You can stop the console consumer via <b>Ctrl-C</b>.</p>
+<p>You can now stop the console consumer, the console producer, the Wordcount application,
the Kafka broker and the Zookeeper server in order via <b>Ctrl-C</b>.</p>
 
  <div class="pagination">
         <a href="/{{version}}/documentation/streams" class="pagination__btn pagination__btn__prev">Previous</a>

http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/docs/toc.html
----------------------------------------------------------------------
diff --git a/docs/toc.html b/docs/toc.html
index 7525b0f..2ec0129 100644
--- a/docs/toc.html
+++ b/docs/toc.html
@@ -141,6 +141,15 @@
                 <li><a href="#connect_development">8.3 Connector Development
Guide</a></li>
             </ul>
         </li>
+        <li><a href="/{{version}}/documentation/streams">9. Kafka Streams</a>
+            <ul>
+                <li><a href="/{{version}}/documentation/streams/quickstart">9.1
Play with a Streams Application</a></li>
+                <li><a href="/{{version}}/documentation/streams/developer-guide">9.2
Developer Guide</a></li>
+                <li><a href="/{{version}}/documentation/streams/core-concepts">9.3
Core Concepts</a></li>
+                <li><a href="/{{version}}/documentation/streams/architecture">9.4
Architecture</a></li>
+                <li><a href="/{{version}}/documentation/streams/upgrade-guide">9.5
Upgrade Guide and API Changes</a></li>
+            </ul>
+        </li>
     </ul>
 
 </script>

http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index 86182a3..1d672b2 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -18,11 +18,13 @@ package org.apache.kafka.streams.examples.pipe;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Demonstrates, using the high-level KStream DSL, how to read data from a source (input)
topic and how to
@@ -51,13 +53,24 @@ public class PipeDemo {
 
         builder.stream("streams-file-input").to("streams-pipe-output");
 
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final CountDownLatch latch = new CountDownLatch(1);
 
-        // usually the stream application would be running forever,
-        // in this example we just let it run for some time and stop since the input data
is finite.
-        Thread.sleep(5000L);
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook")
{
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
 
-        streams.close();
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            Exit.exit(1);
+        }
+        Exit.exit(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 03f8762..616fc48 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.wordcount;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -30,6 +31,7 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import java.util.Arrays;
 import java.util.Locale;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
@@ -60,7 +62,7 @@ public class WordCountDemo {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> source = builder.stream("streams-file-input");
+        KStream<String, String> source = builder.stream("streams-wordcount-input");
 
         KTable<String, Long> counts = source
                 .flatMapValues(new ValueMapper<String, Iterable<String>>() {
@@ -80,13 +82,24 @@ public class WordCountDemo {
         // need to override value serde to Long type
         counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
 
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final CountDownLatch latch = new CountDownLatch(1);
 
-        // usually the stream application would be running forever,
-        // in this example we just let it run for some time and stop since the input data
is finite.
-        Thread.sleep(5000L);
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook")
{
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
 
-        streams.close();
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            Exit.exit(1);
+        }
+        Exit.exit(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index eceddf0..0ff42a7 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.wordcount;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.KafkaStreams;
@@ -33,6 +34,7 @@ import org.apache.kafka.streams.state.Stores;
 
 import java.util.Locale;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
@@ -119,20 +121,31 @@ public class WordCountProcessorDemo {
 
         TopologyBuilder builder = new TopologyBuilder();
 
-        builder.addSource("Source", "streams-file-input");
+        builder.addSource("Source", "streams-wordcount-input");
 
         builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
         builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(),
"Process");
 
         builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
 
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the stream application would be running forever,
-        // in this example we just let it run for some time and stop since the input data
is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook")
{
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            Exit.exit(1);
+        }
+        Exit.exit(0);
     }
 }


Mime
View raw message