kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [03/14] kafka-site git commit: Update site for 1.0.0 release
Date Wed, 01 Nov 2017 12:58:43 GMT
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/cb024c13/100/streams/index.html
----------------------------------------------------------------------
diff --git a/100/streams/index.html b/100/streams/index.html
new file mode 100644
index 0000000..be2f5cc
--- /dev/null
+++ b/100/streams/index.html
@@ -0,0 +1,338 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+   -->
+<script>
+  <!--#include virtual="../js/templateData.js" -->
+</script>
+<script id="streams-template" type="text/x-handlebars-template">
+  <h1>Kafka Streams API</h1>
+       <div class="sub-nav-sticky">
+          <div class="sticky-top">
+             <div style="height:35px">
+                <a  class="active-menu-item" href="#">Introduction</a>
+                <a href="/{{version}}/documentation/streams/developer-guide">Developers Guide</a>
+                <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+                <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
+                <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
+             </div>
+           </div>
+       </div>
+       <h3 class="streams_intro">The easiest way to write mission-critical real-time applications and microservices</h3>
+       <p class="streams__description">Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.</p>
+       <div class="video__series__grid">
+          <div class="yt__video__block">
+            <div class="yt__video__inner__block">
+                <iframe  class="yt_series video_1 active" style="display:block" src="https://www.youtube.com/embed/Z3JKCLG3VP4?rel=0&showinfo=0&end=602" frameborder="0" allowfullscreen></iframe>
+                <iframe  class="yt_series video_2" src="https://www.youtube.com/embed/LxxeXI1mPKo?rel=0&showinfo=0&end=622" frameborder="0" allowfullscreen></iframe>
+                <iframe  class="yt_series video_3" src="https://www.youtube.com/embed/7JYEEx7SBuE?rel=0&showinfo=0end=557" frameborder="0" allowfullscreen></iframe>
+                <iframe  class="yt_series video_4" src="https://www.youtube.com/embed/3kJgYIkAeHs?rel=0&showinfo=0&end=564" frameborder="0" allowfullscreen></iframe>
+             </div>
+            </div>
+            <div class="video__block">
+                <h3>TOUR OF THE STREAMS API</h3>
+                <div class="video__list">
+                   <p class="video__item video_list_1 active" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_1').show();">
+                       <span class="number">1</span><span class="video__text">Intro to Streams</span>
+                   </p>
+                   <p class="video__item video_list_2" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_2').show();">
+                       <span class="number">2</span><span class="video__text">Creating a Streams Application</span>
+                   </p>
+                   <p class="video__item video_list_3" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_3').show();">
+                       <span class="number">3</span><span class="video__text">Transforming Data Pt. 1</span>
+                   </p>
+                   <p class="video__item video_list_4" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_4').show();">
+                      <span class="number">4</span><span class="video__text">Transforming Data Pt. 11</span>
+                   </p>
+                </div>
+            </div>
+       </div>
+       <hr class="separator"> 
+       <div class="use-item-section">
+           <div class="use__list__sec">
+               <h3>Why you'll love using Kafka Streams!</h3>
+               <ul class="use-feature-list">
+                  <li>Elastic, highly scalable, fault-tolerant</li>
+                  <li>Deploy to containers, VMs, bare metal, cloud</li>
+                  <li>Equally viable for small, medium, &amp; large use cases</li>
+                  <li>Fully integrated with Kafka security</li>
+                  <li>Write standard Java applications</li>
+                  <li>Exactly-once processing semantics</li>
+                  <li>No seperate processing cluster required</li>
+                  <li>Develop on Mac, Linux, Windows</li>
+                  
+               </ul>
+           </div>
+           <div class="first__app__cta">
+               <a href="/{{version}}/documentation/streams/tutorial" class="first__app__btn">Write your first app</a>
+           </div>
+       </div>
+       <hr class="separator"> 
+        <h3 class="stream__text">Streams API use cases</h3>
+         <div class="customers__grid">
+           <div class="customer__grid">
+             <div class="customer__item streams_logo_grid streams__ny__grid">
+               <a href="https://www.nytimes.com" target="_blank" class="grid__logo__link">
+                 <span class="grid__item__logo" style="background-image: url('/images/powered-by/NYT.jpg');"></span>
+               </a>
+               <p class="grid__item__customer__description">
+                 <a href="https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/">The New York Times uses Apache Kafka </a>and the Kafka Streams API to store and distribute, in real-time, published content to the various applications and systems that make it available to the readers.
+               </p>
+             </div>
+           </div>
+           <div class="customer__grid">
+             <div class="customer__item  streams_logo_grid streams__line__grid">
+               <a href="https://linecorp.com/" target="_blank" class="grid__logo__link">
+                 <span class="grid__item__logo" style="background-image: url('/images/powered-by/line.svg');width:9rem"></span>
+               </a>
+               <p class="grid__item__customer__description">LINE uses Apache Kafka as a central datahub for our services to communicate to one another. Hundreds of billions of messages are produced daily and are used to execute various business logic, threat detection, search indexing and data analysis. LINE leverages Kafka Streams to reliably transform and filter topics enabling sub topics consumers can efficiently consume, meanwhile retaining easy maintainability thanks to its sophisticated yet minimal code base.</p>
+             </div>
+           </div>
+           <div class="customer__grid">
+             <div class="customer__item  streams_logo_grid streams__zalando__grid">
+               <a href="http://www.zalando.com" target="_blank" class="grid__logo__link">
+                 <span class="grid__item__logo" style="background-image: url('/images/powered-by/zalando.jpg');"></span>
+               </a>
+               <p class="grid__item__customer__description">As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing
+                 <a href="" 'https:="" kafka-summit.org="" sessions="" using-kstreams-ktables-calculate-real-time-domain-rankings="" '="" target="blank'"> event streams</a> enables our technical team to do near-real time business intelligence.
+               </p>
+             </div>
+           </div>
+           <div class="customer__grid">
+             <div class="customer__item  streams_logo_grid streams__rabobank__grid">
+               <a href="https://www.rabobank.com" target="_blank" class="grid__logo__link">
+                 <span class="grid__item__logo" style="background-image: url('/images/powered-by/rabobank.jpg');"></span>
+               </a>
+               <p class="grid__item__customer__description">Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka. It is used by an increasing amount of financial processes and services, one of which is Rabo Alerts. This service alerts customers in real-time upon financial events and is built using Kafka Streams.</p>
+             </div>
+           </div>
+         </div>
+       <h3 style="margin-top: 5.3rem;">Hello Kafka Streams</h3>
+       <p>The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale</p>
+       
+       <div class="code-example">
+           <div class="btn-group">
+               <a class="selected b-java-8" data-section="java-8">Java 8+</a>
+               <a class="b-java-7" data-section="java-7">Java 7</a>
+               <a class="b-scala" data-section="scala">Scala</a>
+           </div>
+       
+           <div class="code-example__snippet b-java-8 selected">
+               <pre class="brush: java;">
+                   import org.apache.kafka.common.serialization.Serdes;
+                   import org.apache.kafka.streams.KafkaStreams;
+                   import org.apache.kafka.streams.StreamsConfig;
+                   import org.apache.kafka.streams.kstream.KStream;
+                   import org.apache.kafka.streams.kstream.KStreamBuilder;
+                   import org.apache.kafka.streams.kstream.KTable;
+       
+                   import java.util.Arrays;
+                   import java.util.Properties;
+       
+                   public class WordCountApplication {
+       
+                       public static void main(final String[] args) throws Exception {
+                           Properties config = new Properties();
+                           config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
+                           config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
+                           config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+                           config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+       
+                           KStreamBuilder builder = new KStreamBuilder();
+                           KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
+                           KTable&lt;String, Long&gt; wordCounts = textLines
+                               .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
+                               .groupBy((key, word) -> word)
+                               .count("Counts");
+                           wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
+       
+                           KafkaStreams streams = new KafkaStreams(builder, config);
+                           streams.start();
+                       }
+       
+                   }
+               </pre>
+           </div>
+       
+           <div class="code-example__snippet b-java-7">
+               <pre class="brush: java;">
+                   import org.apache.kafka.common.serialization.Serdes;
+                   import org.apache.kafka.streams.KafkaStreams;
+                   import org.apache.kafka.streams.StreamsConfig;
+                   import org.apache.kafka.streams.kstream.KStream;
+                   import org.apache.kafka.streams.kstream.KStreamBuilder;
+                   import org.apache.kafka.streams.kstream.KTable;
+                   import org.apache.kafka.streams.kstream.KeyValueMapper;
+                   import org.apache.kafka.streams.kstream.ValueMapper;
+       
+                   import java.util.Arrays;
+                   import java.util.Properties;
+       
+                   public class WordCountApplication {
+       
+                       public static void main(final String[] args) throws Exception {
+                           Properties config = new Properties();
+                           config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
+                           config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
+                           config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+                           config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+       
+                           KStreamBuilder builder = new KStreamBuilder();
+                           KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
+                           KTable&lt;String, Long&gt; wordCounts = textLines
+                               .flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+                                   @Override
+                                   public Iterable&lt;String&gt; apply(String textLine) {
+                                       return Arrays.asList(textLine.toLowerCase().split("\\W+"));
+                                   }
+                               })
+                               .groupBy(new KeyValueMapper&lt;String, String, String&gt;() {
+                                   @Override
+                                   public String apply(String key, String word) {
+                                       return word;
+                                   }
+                               })
+                               .count("Counts");
+                           wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
+       
+                           KafkaStreams streams = new KafkaStreams(builder, config);
+                           streams.start();
+                       }
+       
+                   }
+               </pre>
+           </div>
+       
+           <div class="code-example__snippet b-scala">
+               <pre class="brush: scala;">
+                   import java.lang.Long
+                   import java.util.Properties
+                   import java.util.concurrent.TimeUnit
+       
+                   import org.apache.kafka.common.serialization._
+                   import org.apache.kafka.streams._
+                   import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
+       
+                   import scala.collection.JavaConverters.asJavaIterableConverter
+       
+                   object WordCountApplication {
+       
+                       def main(args: Array[String]) {
+                           val config: Properties = {
+                               val p = new Properties()
+                               p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
+                               p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
+                               p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
+                               p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
+                               p
+                           }
+       
+                           val builder: KStreamBuilder = new KStreamBuilder()
+                           val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
+                           val wordCounts: KTable[String, Long] = textLines
+                               .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
+                               .groupBy((_, word) => word)
+                               .count("Counts")
+                           wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")
+       
+                           val streams: KafkaStreams = new KafkaStreams(builder, config)
+                           streams.start()
+       
+                           Runtime.getRuntime.addShutdownHook(new Thread(() => {
+                               streams.close(10, TimeUnit.SECONDS)
+                           }))
+                       }
+       
+                   }
+               </pre>
+           </div>
+       </div>
+       
+       <div class="pagination">
+           <a href="#" class="pagination__btn pagination__btn__prev pagination__btn--disabled">Previous</a>
+           <a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__next">Next</a>
+       </div>
+     
+</script>
+<!--#include virtual="../../includes/_header.htm" -->
+<!--#include virtual="../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+  <!--#include virtual="../../includes/_nav.htm" -->
+  <div class="right">
+    <!--#include virtual="../../includes/_docs_banner.htm" -->
+    <ul class="breadcrumbs">
+      <li><a href="/documentation">Documentation</a>
+      </li>
+    </ul>
+    <div class="p-streams"></div>
+  </div>
+</div>
+<!--#include virtual="../../includes/_footer.htm" -->
+<script>
+  $(function() {
+         
+         // Show selected style on nav item
+         $('.b-nav__streams').addClass('selected');
+    
+         $('.video_list_1').click(function(){    
+             $('.video_2').attr('src', $('.video_2').attr('src'));
+             $('.video_3').attr('src', $('.video_3').attr('src'));
+             $('.video_4').attr('src', $('.video_4').attr('src'));
+
+           });
+
+         $('.video_list_2').click(function(){    
+               $('.video_1').attr('src', $('.video_1').attr('src'));
+               $('.video_3').attr('src', $('.video_3').attr('src'));
+               $('.video_4').attr('src', $('.video_4').attr('src'));
+
+           });
+
+         $('.video_list_3').click(function(){    
+              $('.video_1').attr('src', $('.video_1').attr('src'));
+              $('.video_2').attr('src', $('.video_2').attr('src'));
+              $('.video_4').attr('src', $('.video_4').attr('src'));
+           });
+
+         $('.video_list_4').click(function(){    
+              $('.video_1').attr('src', $('.video_1').attr('src'));
+              $('.video_2').attr('src', $('.video_2').attr('src'));
+              $('.video_3').attr('src', $('.video_3').attr('src'));
+           });
+           
+
+          //sticky secondary nav
+          var $navbar = $(".sub-nav-sticky"),
+               y_pos = $navbar.offset().top,
+               height = $navbar.height();
+       
+           $(window).scroll(function() {
+               var scrollTop = $(window).scrollTop();
+           
+               if (scrollTop > y_pos - height) {
+                   $navbar.addClass("navbar-fixed")
+               } else if (scrollTop <= y_pos) {
+                   $navbar.removeClass("navbar-fixed")
+               }
+           });
+       
+         // Display docs subnav items
+         $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+         // Show selected code example
+         $('.btn-group a').click(function(){
+             var targetClass = '.b-' + $(this).data().section;
+             $('.code-example__snippet, .btn-group a').removeClass('selected');
+             $(targetClass).addClass('selected');
+         });
+       });
+</script>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/cb024c13/100/streams/quickstart.html
----------------------------------------------------------------------
diff --git a/100/streams/quickstart.html b/100/streams/quickstart.html
new file mode 100644
index 0000000..c4b52ea
--- /dev/null
+++ b/100/streams/quickstart.html
@@ -0,0 +1,361 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<script><!--#include virtual="../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+
+  <h1>Run Streams Demo Application</h1>
+
+<p>
+  This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data. However, if you have already started Kafka and
+  ZooKeeper, feel free to skip the first two steps.
+</p>
+
+  <p>
+ Kafka Streams is a client library for building mission-critical real-time applications and microservices,
+  where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of
+  writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's
+  server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed,
+ and much more.
+  </p>
+  <p>
+This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist
+of the <code><a href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java">WordCountDemo</a></code> example code (converted to use Java 8 lambda expressions for easy reading).
+</p>
+<pre class="brush: java;">
+// Serializers/deserializers (serde) for String and Long types
+final Serde&lt;String&gt; stringSerde = Serdes.String();
+final Serde&lt;Long&gt; longSerde = Serdes.Long();
+
+// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
+// represent lines of text (for the sake of this example, we ignore whatever may be stored
+// in the message keys).
+KStream&lt;String, String&gt; textLines = builder.stream("streams-plaintext-input",
+    Consumed.with(stringSerde, stringSerde);
+
+KTable&lt;String, Long&gt; wordCounts = textLines
+    // Split each text line, by whitespace, into words.
+    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
+
+    // Group the text words as message keys
+    .groupBy((key, value) -> value)
+
+    // Count the occurrences of each word (message key).
+    .count()
+
+// Store the running counts as a changelog stream to the output topic.
+wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
+</pre>
+
+<p>
+It implements the WordCount
+algorithm, which computes a word occurrence histogram from the input text. However, unlike other WordCount examples
+you might have seen before that operate on bounded data, the WordCount demo application behaves slightly differently because it is
+designed to operate on an <b>infinite, unbounded stream</b> of data. Similar to the bounded variant, it is a stateful algorithm that
+tracks and updates the counts of words. However, since it must assume potentially
+unbounded input data, it will periodically output its current state and results while continuing to process more data
+because it cannot know when it has processed "all" the input data.
+</p>
+<p>
+  As the first step, we will start Kafka (unless you already have it started) and then we will
+  prepare input data to a Kafka topic, which will subsequently be processed by a Kafka Streams application.
+</p>
+
+<h4><a id="quickstart_streams_download" href="#quickstart_streams_download">Step 1: Download the code</a></h4>
+
+<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz" title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
+Note that there are multiple downloadable Scala versions and we choose to use the recommended version ({{scalaVersion}}) here:
+
+<pre class="brush: bash;">
+&gt; tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
+&gt; cd kafka_{{scalaVersion}}-{{fullDotVersion}}
+</pre>
+
+<h4><a id="quickstart_streams_startserver" href="#quickstart_streams_startserver">Step 2: Start the Kafka server</a></h4>
+
+<p>
+Kafka uses <a href="https://zookeeper.apache.org/">ZooKeeper</a> so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
+</p>
+
+<pre class="brush: bash;">
+&gt; bin/zookeeper-server-start.sh config/zookeeper.properties
+[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
+...
+</pre>
+
+<p>Now start the Kafka server:</p>
+<pre class="brush: bash;">
+&gt; bin/kafka-server-start.sh config/server.properties
+[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
+[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
+...
+</pre>
+
+
+<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 3: Prepare input topic and start Kafka producer</a></h4>
+
+<!--
+
+<pre class="brush: bash;">
+&gt; echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
+</pre>
+Or on Windows:
+<pre class="brush: bash;">
+&gt; echo all streams lead to kafka> file-input.txt
+&gt; echo hello kafka streams>> file-input.txt
+&gt; echo|set /p=join kafka summit>> file-input.txt
+</pre>
+
+-->
+
+Next, we create the input topic named <b>streams-plaintext-input</b> and the output topic named <b>streams-wordcount-output</b>:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-topics.sh --create \
+    --zookeeper localhost:2181 \
+    --replication-factor 1 \
+    --partitions 1 \
+    --topic streams-plaintext-input
+Created topic "streams-plaintext-input".
+</pre>
+
+Note: we create the output topic with compaction enabled because the output stream is a changelog stream
+(cf. <a href="#anchor-changelog-output">explanation of application output</a> below).
+
+<pre class="brush: bash;">
+&gt; bin/kafka-topics.sh --create \
+    --zookeeper localhost:2181 \
+    --replication-factor 1 \
+    --partitions 1 \
+    --topic streams-wordcount-output \
+    --config cleanup.policy=compact
+Created topic "streams-wordcount-output".
+</pre>
+
+The created topic can be described with the same <b>kafka-topics</b> tool:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-topics.sh --zookeeper localhost:2181 --describe
+
+Topic:streams-plaintext-input	PartitionCount:1	ReplicationFactor:1	Configs:
+    Topic: streams-plaintext-input	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
+Topic:streams-wordcount-output	PartitionCount:1	ReplicationFactor:1	Configs:
+	Topic: streams-wordcount-output	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
+</pre>
+
+<h4><a id="quickstart_streams_start" href="#quickstart_streams_start">Step 4: Start the Wordcount Application</a></h4>
+
+The following command starts the WordCount demo application:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
+</pre>
+
+<p>
+The demo application will read from the input topic <b>streams-plaintext-input</b>, perform the computations of the WordCount algorithm on each of the read messages,
+and continuously write its current results to the output topic <b>streams-wordcount-output</b>.
+Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka.
+</p>
+
+Now we can start the console producer in a separate terminal to write some input data to this topic:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
+</pre>
+
+and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+</pre>
+
+
+<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 5: Process some data</a></h4>
+
+Now let's write some message with the console producer into the input topic <b>streams-plaintext-input</b> by entering a single line of text and then hit &lt;RETURN&gt;.
+This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered
+(in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
+all streams lead to kafka
+</pre>
+
+<p>
+This message will be processed by the Wordcount application and the following output data will be written to the <b>streams-wordcount-output</b> topic and printed by the console consumer:
+</p>
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all	    1
+streams	1
+lead	1
+to	    1
+kafka	1
+</pre>
+
+<p>
+Here, the first column is the Kafka message key in <code>java.lang.String</code> format and represents a word that is being counted, and the second column is the message value in <code>java.lang.Long</code>format, representing the word's latest count.
+</p>
+
+Now let's continue writing one more message with the console producer into the input topic <b>streams-plaintext-input</b>.
+Enter the text line "hello kafka streams" and hit &lt;RETURN&gt;.
+Your terminal should look as follows:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
+all streams lead to kafka
+hello kafka streams
+</pre>
+
+In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all	    1
+streams	1
+lead	1
+to	    1
+kafka	1
+hello	1
+kafka	2
+streams	2
+</pre>
+
+Here the last printed lines <b>kafka 2</b> and <b>streams 2</b> indicate updates to the keys <b>kafka</b> and <b>streams</b> whose counts have been incremented from <b>1</b> to <b>2</b>.
+Whenever you write further input messages to the input topic, you will observe new messages being added to the <b>streams-wordcount-output</b> topic,
+representing the most recent word counts as computed by the WordCount application.
+Let's enter one final input text line "join kafka summit" and hit &lt;RETURN&gt; in the console producer to the input topic <b>streams-wordcount-input</b> before we wrap up this quickstart:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+all streams lead to kafka
+hello kafka streams
+join kafka summit
+</pre>
+
+<a name="anchor-changelog-output"></a>
+The <b>streams-wordcount-output</b> topic will subsequently show the corresponding updated word counts (see last three lines):
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all	    1
+streams	1
+lead	1
+to	    1
+kafka	1
+hello	1
+kafka	2
+streams	2
+join	1
+kafka	3
+summit	1
+</pre>
+
+As one can see, outputs of the Wordcount application is actually a continuous stream of updates, where each output record (i.e. each line in the original output above) is
+an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.
+
+<p>
+The two diagrams below illustrate what is essentially happening behind the scenes.
+The first column shows the evolution of the current state of the <code>KTable&lt;String, Long&gt;</code> that is counting word occurrences for <code>count</code>.
+The second column shows the change records that result from state updates to the KTable and that are being sent to the output Kafka topic <b>streams-wordcount-output</b>.
+</p>
+
+<img src="/{{version}}/images/streams-table-updates-02.png" style="float: right; width: 25%;">
+<img src="/{{version}}/images/streams-table-updates-01.png" style="float: right; width: 25%;">
+
+<p>
+First the text line "all streams lead to kafka" is being processed.
+The <code>KTable</code> is being built up as each new word results in a new table entry (highlighted with a green background), and a corresponding change record is sent to the downstream <code>KStream</code>.
+</p>
+<p>
+When the second text line "hello kafka streams" is processed, we observe, for the first time, that existing entries in the <code>KTable</code> are being updated (here: for the words "kafka" and for "streams"). And again, change records are being sent to the output topic.
+</p>
+<p>
+And so on (we skip the illustration of how the third line is being processed). This explains why the output topic has the contents we showed above, because it contains the full record of changes.
+</p>
+
+<p>
+Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to leverage the duality between a table and a changelog stream (here: table = the KTable, changelog stream = the downstream KStream): you can publish every change of the table to a stream, and if you consume the entire changelog stream from beginning to end, you can reconstruct the contents of the table.
+</p>
+
+<h4><a id="quickstart_streams_stop" href="#quickstart_streams_stop">Step 6: Teardown the application</a></h4>
+
+<p>You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the ZooKeeper server in order via <b>Ctrl-C</b>.</p>
+
+ <div class="pagination">
+        <a href="/{{version}}/documentation/streams" class="pagination__btn pagination__btn__prev">Previous</a>
+        <a href="/{{version}}/documentation/streams/tutorial" class="pagination__btn pagination__btn__next">Next</a>
+    </div>
+</script>
+
+<div class="p-quickstart-streams"></div>
+
+<!--#include virtual="../../includes/_header.htm" -->
+<!--#include virtual="../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+    <!--#include virtual="../../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <ul class="breadcrumbs">
+            <li><a href="/documentation">Documentation</a></li>
+            <li><a href="/documentation/streams">Streams</a></li>
+        </ul>
+        <div class="p-content"></div>
+    </div>
+</div>
+<!--#include virtual="../../includes/_footer.htm" -->
+<script>
+$(function() {
+  // Show selected style on nav item
+  $('.b-nav__streams').addClass('selected');
+
+  // Display docs subnav items
+  $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+});
+</script>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/cb024c13/100/streams/tutorial.html
----------------------------------------------------------------------
diff --git a/100/streams/tutorial.html b/100/streams/tutorial.html
new file mode 100644
index 0000000..ec41a93
--- /dev/null
+++ b/100/streams/tutorial.html
@@ -0,0 +1,638 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<script><!--#include virtual="../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+    <h1>Tutorial: Write a Streams Application</h1>
+
+    <p>
+        In this guide we will start from scratch on setting up your own project to write a stream processing application using Kafka Streams.
+        It is highly recommended to read the <a href="/{{version}}/documentation/streams/quickstart">quickstart</a> first on how to run a Streams application written in Kafka Streams if you have not done so.
+    </p>
+
+    <h4><a id="tutorial_maven_setup" href="#tutorial_maven_setup">Setting up a Maven Project</a></h4>
+
+    <p>
+        We are going to use a Kafka Streams Maven Archetype for creating a Streams project structure with the following commands:
+    </p>
+
+    <pre class="brush: bash;">
+        mvn archetype:generate \
+            -DarchetypeGroupId=org.apache.kafka \
+            -DarchetypeArtifactId=streams-quickstart-java \
+            -DarchetypeVersion={{fullDotVersion}} \
+            -DgroupId=streams.examples \
+            -DartifactId=streams.examples \
+            -Dversion=0.1 \
+            -Dpackage=myapps
+    </pre>
+
+    <p>
+        You can use a different value for <code>groupId</code>, <code>artifactId</code> and <code>package</code> parameters if you like.
+        Assuming the above parameter values are used, this command will create a project structure that looks like this:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; tree streams.examples
+        streams-quickstart
+        |-- pom.xml
+        |-- src
+            |-- main
+                |-- java
+                |   |-- myapps
+                |       |-- LineSplit.java
+                |       |-- Pipe.java
+                |       |-- WordCount.java
+                |-- resources
+                    |-- log4j.properties
+    </pre>
+
+    <p>
+        The <code>pom.xml</code> file included in the project already has the Streams dependency defined,
+        and there are already several example programs written with Streams library under <code>src/main/java</code>.
+        Since we are going to start writing such programs from scratch, we can now delete these examples:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; cd streams-quickstart
+        &gt; rm src/main/java/myapps/*.java
+    </pre>
+
+    <h4><a id="tutorial_code_pipe" href="#tutorial_code_pipe">Writing a first Streams application: Pipe</a></h4>
+
+    It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under <code>src/main/java</code>.
+    Let's name it <code>Pipe.java</code>:
+
+    <pre class="brush: java;">
+        package myapps;
+
+        public class Pipe {
+
+            public static void main(String[] args) throws Exception {
+
+            }
+        }
+    </pre>
+
+    <p>
+        We are going to fill in the <code>main</code> function to write this pipe program. Note that we will not list the import statements as we go since IDEs can usually add them automatically.
+        However if you are using a text editor you need to manually add the imports, and at the end of this section we'll show the complete code snippet with import statement for you.
+    </p>
+
+    <p>
+        The first step to write a Streams application is to create a <code>java.util.Properties</code> map to specify different Streams execution configuration values as defined in <code>StreamsConfig</code>.
+        A couple of important configuration values you need to set are: <code>StreamsConfig.BOOTSTRAP_SERVERS_CONFIG</code>, which specifies a list of host/port pairs to use for establishing the initial connection to the Kafka cluster,
+        and <code>StreamsConfig.APPLICATION_ID_CONFIG</code>, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster:
+    </p>
+
+    <pre class="brush: java;">
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
+    </pre>
+
+    <p>
+        In addition, you can customize other configurations in the same map, for example, default serialization and deserialization libraries for the record key-value pairs:
+    </p>
+
+    <pre class="brush: java;">
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+    </pre>
+
+    <p>
+        For a full list of configurations of Kafka Streams please refer to this <a href="/{{version}}/documentation/#streamsconfigs">table</a>.
+    </p>
+
+    <p>
+        Next we will define the computational logic of our Streams application.
+        In Kafka Streams this computational logic is defined as a <code>topology</code> of connected processor nodes.
+        We can use a topology builder to construct such a topology,
+    </p>
+
+    <pre class="brush: java;">
+        final StreamsBuilder builder = new StreamsBuilder();
+    </pre>
+
+    <p>
+        And then create a source stream from a Kafka topic named <code>streams-plaintext-input</code> using this topology builder:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+    </pre>
+
+    <p>
+        Now we get a <code>KStream</code> that is continuously generating records from its source Kafka topic <code>streams-plaintext-input</code>.
+        The records are organized as <code>String</code> typed key-value pairs.
+        The simplest thing we can do with this stream is to write it into another Kafka topic, say it's named <code>streams-pipe-output</code>:
+    </p>
+
+    <pre class="brush: java;">
+        source.to("streams-pipe-output");
+    </pre>
+
+    <p>
+        Note that we can also concatenate the above two lines into a single line as:
+    </p>
+
+    <pre class="brush: java;">
+        builder.stream("streams-plaintext-input").to("streams-pipe-output");
+    </pre>
+
+    <p>
+        We can inspect what kind of <code>topology</code> is created from this builder by doing the following:
+    </p>
+
+    <pre class="brush: java;">
+        final Topology topology = builder.build();
+    </pre>
+
+    <p>
+        And print its description to standard output as:
+    </p>
+
+    <pre class="brush: java;">
+        System.out.println(topology.describe());
+    </pre>
+
+    <p>
+        If we just stop here, compile and run the program, it will output the following information:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.Pipe
+        Sub-topologies:
+          Sub-topology: 0
+            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
+            Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
+        Global Stores:
+          none
+    </pre>
+
+    <p>
+        As shown above, it illustrates that the constructed topology has two processor nodes, a source node <code>KSTREAM-SOURCE-0000000000</code> and a sink node <code>KSTREAM-SINK-0000000001</code>.
+        <code>KSTREAM-SOURCE-0000000000</code> continuously read records from Kafka topic <code>streams-plaintext-input</code> and pipe them to its downstream node <code>KSTREAM-SINK-0000000001</code>;
+        <code>KSTREAM-SINK-0000000001</code> will write each of its received record in order to another Kafka topic <code>streams-pipe-output</code>
+        (the <code>--&gt;</code> and <code>&lt;--</code> arrows dictates the downstream and upstream processor nodes of this node, i.e. "children" and "parents" within the topology graph).
+        It also illustrates that this simple topology has no global state stores associated with it (we will talk about state stores more in the following sections).
+    </p>
+
+    <p>
+        Note that we can always describe the topology as we did above at any given point while we are building it in the code, so as a user you can interactively "try and taste" your computational logic defined in the topology until you are happy with it.
+        Suppose we are already done with this simple topology that just pipes data from one Kafka topic to another in an endless streaming manner,
+        we can now construct the Streams client with the two components we have just constructed above: the configuration map and the topology object
+        (one can also construct a <code>StreamsConfig</code> object from the <code>props</code> map and then pass that object to the constructor,
+        <code>KafkaStreams</code> have overloaded constructor functions to takes either type).
+    </p>
+
+    <pre class="brush: java;">
+        final KafkaStreams streams = new KafkaStreams(topology, props);
+    </pre>
+
+    <p>
+        By calling its <code>start()</code> function we can trigger the execution of this client.
+        The execution won't stop until <code>close()</code> is called on this client.
+        We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:
+    </p>
+
+    <pre class="brush: java;">
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            System.exit(1);
+        }
+        System.exit(0);
+    </pre>
+
+    <p>
+        The complete code so far looks like this:
+    </p>
+
+    <pre class="brush: java;">
+        package myapps;
+
+        import org.apache.kafka.common.serialization.Serdes;
+        import org.apache.kafka.streams.KafkaStreams;
+        import org.apache.kafka.streams.StreamsBuilder;
+        import org.apache.kafka.streams.StreamsConfig;
+        import org.apache.kafka.streams.Topology;
+
+        import java.util.Properties;
+        import java.util.concurrent.CountDownLatch;
+
+        public class Pipe {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
+                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+                final StreamsBuilder builder = new StreamsBuilder();
+
+                builder.stream("streams-plaintext-input").to("streams-pipe-output");
+
+                final Topology topology = builder.build();
+
+                final KafkaStreams streams = new KafkaStreams(topology, props);
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                // attach shutdown handler to catch control-c
+                Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
+                    @Override
+                    public void run() {
+                        streams.close();
+                        latch.countDown();
+                    }
+                });
+
+                try {
+                    streams.start();
+                    latch.await();
+                } catch (Throwable e) {
+                    System.exit(1);
+                }
+                System.exit(0);
+            }
+        }
+    </pre>
+
+    <p>
+        If you already have the Kafka broker up and running at <code>localhost:9092</code>,
+        and the topics <code>streams-plaintext-input</code> and <code>streams-pipe-output</code> created on that broker,
+        you can run this code in your IDE or on the command line, using Maven:
+    </p>
+
+    <pre class="brush: brush;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.Pipe
+    </pre>
+
+    <p>
+        For detailed instructions on how to run a Streams application and observe its computing results,
+        please read the <a href="/{{version}}/documentation/streams/quickstart">Play with a Streams Application</a> section.
+        We will not talk about this in the rest of this section.
+    </p>
+
+    <h4><a id="tutorial_code_linesplit" href="#tutorial_code_linesplit">Writing a second Streams application: Line Split</a></h4>
+
+    <p>
+        We have learned how to construct a Streams client with its two key components: the <code>StreamsConfig</code> and <code>Topology</code>.
+        Now let's move on to add some real processing logic by augmenting the current topology.
+        We can first create another program by first copy the existing <code>Pipe.java</code> class:
+    </p>
+
+    <pre class="brush: brush;">
+        &gt; cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java
+    </pre>
+
+    <p>
+        And change its class name as well as the application id config to distinguish with the original program:
+    </p>
+
+    <pre class="brush: java;">
+        public class LineSplit {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
+                // ...
+            }
+        }
+    </pre>
+
+    <p>
+        Since each of the source stream's record is a <code>String</code> typed key-value pair,
+        let's treat the value string as a text line and split it into words with a <code>FlatMapValues</code> operator:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+        KStream&lt;String, String&gt; words = source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+                    @Override
+                    public Iterable&lt;String&gt; apply(String value) {
+                        return Arrays.asList(value.split("\\W+"));
+                    }
+                });
+    </pre>
+
+    <p>
+        The operator will take the <code>source</code> stream as its input, and generate a new stream named <code>words</code>
+        by processing each record from its source stream in order and breaking its value string into a list of words, and producing
+        each word as a new record to the output <code>words</code> stream.
+        This is a stateless operator that does not need to keep track of any previously received records or processed results.
+        Note if you are using JDK 8 you can use lambda expression and simplify the above code as:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+        KStream&lt;String, String&gt; words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
+    </pre>
+
+    <p>
+        And finally we can write the word stream back into another Kafka topic, say <code>streams-linesplit-output</code>.
+        Again, these two steps can be concatenated as the following (assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
+              .to("streams-linesplit-output");
+    </pre>
+
+    <p>
+        If we now describe this augmented topology as <code>System.out.println(topology.describe())</code>, we will get the following:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.LineSplit
+        Sub-topologies:
+          Sub-topology: 0
+            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
+            Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000
+            Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001
+          Global Stores:
+            none
+    </pre>
+
+    <p>
+        As we can see above, a new processor node <code>KSTREAM-FLATMAPVALUES-0000000001</code> is injected into the topology between the original source and sink nodes.
+        It takes the source node as its parent and the sink node as its child.
+        In other words, each record fetched by the source node will first traverse to the newly added <code>KSTREAM-FLATMAPVALUES-0000000001</code> node to be processed,
+        and one or more new records will be generated as a result. They will continue traverse down to the sink node to be written back to Kafka.
+        Note this processor node is "stateless" as it is not associated with any stores (i.e. <code>(stores: [])</code>).
+    </p>
+
+    <p>
+        The complete code looks like this (assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        package myapps;
+
+        import org.apache.kafka.common.serialization.Serdes;
+        import org.apache.kafka.streams.KafkaStreams;
+        import org.apache.kafka.streams.StreamsBuilder;
+        import org.apache.kafka.streams.StreamsConfig;
+        import org.apache.kafka.streams.Topology;
+        import org.apache.kafka.streams.kstream.KStream;
+
+        import java.util.Arrays;
+        import java.util.Properties;
+        import java.util.concurrent.CountDownLatch;
+
+        public class LineSplit {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
+                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+                final StreamsBuilder builder = new StreamsBuilder();
+
+                KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+                source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
+                      .to("streams-linesplit-output");
+
+                final Topology topology = builder.build();
+                final KafkaStreams streams = new KafkaStreams(topology, props);
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                // ... same as Pipe.java above
+            }
+        }
+    </pre>
+
+    <h4><a id="tutorial_code_wordcount" href="#tutorial_code_wordcount">Writing a third Streams application: Wordcount</a></h4>
+
+    <p>
+        Let's now take a step further to add some "stateful" computations to the topology by counting the occurrence of the words split from the source text stream.
+        Following similar steps let's create another program based on the <code>LineSplit.java</code> class:
+    </p>
+
+    <pre class="brush: java;">
+        public class WordCount {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
+                // ...
+            }
+        }
+    </pre>
+
+    <p>
+        In order to count the words we can first modify the <code>flatMapValues</code> operator to treat all of them as lower case (assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+                    @Override
+                    public Iterable&lt;String&gt; apply(String value) {
+                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    }
+                });
+    </pre>
+
+    <p>
+        In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a <code>groupBy</code> operator.
+        This operator generate a new grouped stream, which can then be aggregated by a <code>count</code> operator, which generates a running count on each of the grouped keys:
+    </p>
+
+    <pre class="brush: java;">
+        KTable&lt;String, Long&gt; counts =
+        source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+                    @Override
+                    public Iterable&lt;String&gt; apply(String value) {
+                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    }
+                })
+              .groupBy(new KeyValueMapper&lt;String, String, String&gt;() {
+                   @Override
+                   public String apply(String key, String value) {
+                       return value;
+                   }
+                })
+              // Materialize the result into a KeyValueStore named "counts-store".
+              // The Materialized store is always of type &lt;Bytes, byte[]&gt; as this is the format of the inner most store.
+              .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt; as("counts-store"));
+    </pre>
+
+    <p>
+        Note that the <code>count</code> operator has a <code>Materialized</code> parameter that specifies that the
+        running count should be stored in a state store named <code>counts-store</code>.
+        This <code>Counts</code> store can be queried in real-time, with details described in the <a href="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer Manual</a>.
+    </p>
+
+    <p>
+        We can also write the <code>counts</code> KTable's changelog stream back into another Kafka topic, say <code>streams-wordcount-output</code>.
+        Because the result is a changelog stream, the output topic <code>streams-wordcount-output</code> should be configured with log compaction enabled.
+        Note that this time the value type is no longer <code>String</code> but <code>Long</code>, so the default serialization classes are not viable for writing it to Kafka anymore.
+        We need to provide overridden serialization methods for <code>Long</code> types, otherwise a runtime exception will be thrown:
+    </p>
+
+    <pre class="brush: java;">
+        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
+    </pre>
+
+    <p>
+        Note that in order to read the changelog stream from topic <code>streams-wordcount-output</code>,
+        one needs to set the value deserialization as <code>org.apache.kafka.common.serialization.LongDeserializer</code>.
+        Details of this can be found in the <a href="/{{version}}/documentation/streams/quickstart">Play with a Streams Application</a> section.
+        Assuming lambda expression from JDK 8 can be used, the above code can be simplified as:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+              .groupBy((key, value) -> value)
+              .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"))
+              .toStream()
+              .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
+    </pre>
+
+    <p>
+        If we again describe this augmented topology as <code>System.out.println(topology.describe())</code>, we will get the following:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.WordCount
+        Sub-topologies:
+          Sub-topology: 0
+            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
+            Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
+            Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
+            Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
+            Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005
+          Sub-topology: 1
+            Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003
+            Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
+            Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
+            Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
+        Global Stores:
+          none
+    </pre>
+
+    <p>
+        As we can see above, the topology now contains two disconnected sub-topologies.
+        The first sub-topology's sink node <code>KSTREAM-SINK-0000000004</code> will write to a repartition topic <code>Counts-repartition</code>,
+        which will be read by the second sub-topology's source node <code>KSTREAM-SOURCE-0000000006</code>.
+        The repartition topic is used to "shuffle" the source stream by its aggregation key, which is in this case the value string.
+        In addition, inside the first sub-topology a stateless <code>KSTREAM-FILTER-0000000005</code> node is injected between the grouping <code>KSTREAM-KEY-SELECT-0000000002</code> node and the sink node to filter out any intermediate record whose aggregate key is empty.
+    </p>
+    <p>
+        In the second sub-topology, the aggregation node <code>KSTREAM-AGGREGATE-0000000003</code> is associated with a state store named <code>Counts</code> (the name is specified by the user in the <code>count</code> operator).
+        Upon receiving each record from its upcoming stream source node, the aggregation processor will first query its associated <code>Counts</code> store to get the current count for that key, augment by one, and then write the new count back to the store.
+        Each updated count for the key will also be piped downstream to the <code>KTABLE-TOSTREAM-0000000007</code> node, which interpret this update stream as a record stream before further piping to the sink node <code>KSTREAM-SINK-0000000008</code> for writing back to Kafka.
+    </p>
+
+    <p>
+        The complete code looks like this (assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        package myapps;
+
+        import org.apache.kafka.common.serialization.Serdes;
+        import org.apache.kafka.streams.KafkaStreams;
+        import org.apache.kafka.streams.StreamsBuilder;
+        import org.apache.kafka.streams.StreamsConfig;
+        import org.apache.kafka.streams.Topology;
+        import org.apache.kafka.streams.kstream.KStream;
+
+        import java.util.Arrays;
+        import java.util.Locale;
+        import java.util.Properties;
+        import java.util.concurrent.CountDownLatch;
+
+        public class WordCount {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
+                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+                final StreamsBuilder builder = new StreamsBuilder();
+
+                KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+                source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+                      .groupBy((key, value) -> value)
+                      .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"))
+                      .toStream()
+                      .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
+
+                final Topology topology = builder.build();
+                final KafkaStreams streams = new KafkaStreams(topology, props);
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                // ... same as Pipe.java above
+            }
+        }
+    </pre>
+
+    <div class="pagination">
+        <a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__prev">Previous</a>
+        <a href="/{{version}}/documentation/streams/developer-guide" class="pagination__btn pagination__btn__next">Next</a>
+    </div>
+</script>
+
+<div class="p-quickstart-streams"></div>
+
+<!--#include virtual="../../includes/_header.htm" -->
+<!--#include virtual="../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+    <!--#include virtual="../../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <ul class="breadcrumbs">
+            <li><a href="/documentation">Documentation</a></li>
+            <li><a href="/documentation/streams">Streams</a></li>
+        </ul>
+        <div class="p-content"></div>
+    </div>
+</div>
+<!--#include virtual="../../includes/_footer.htm" -->
+<script>
+$(function() {
+  // Show selected style on nav item
+  $('.b-nav__streams').addClass('selected');
+
+  // Display docs subnav items
+  $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+});
+</script>


Mime
View raw message