kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [01/15] kafka-site git commit: MINOR: Improve Streams Dev Guide
Date Wed, 20 Dec 2017 21:23:17 GMT
Repository: kafka-site
Updated Branches:
  refs/heads/asf-site 6752bf4a1 -> 12dbff55d


http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/10/tutorial.html
----------------------------------------------------------------------
diff --git a/10/tutorial.html b/10/tutorial.html
new file mode 100644
index 0000000..7d75ed6
--- /dev/null
+++ b/10/tutorial.html
@@ -0,0 +1,663 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<script><!--#include virtual="../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+    <h1>Tutorial: Write a Streams Application</h1>
+    <div class="sub-nav-sticky">
+      <div class="sticky-top">
+        <div style="height:35px">
+          <a href="/{{version}}/documentation/streams/">Introduction</a>
+          <a href="/{{version}}/documentation/streams/developer-guide/">Developer Guide</a>
+          <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+          <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
+          <a class="active-menu-item" href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
+        </div>
+      </div>
+  </div> 
+    <p>
+        In this guide we will start from scratch on setting up your own project to write a stream processing application using Kafka Streams.
+        It is highly recommended to read the <a href="/{{version}}/documentation/streams/quickstart">quickstart</a> first on how to run a Streams application written in Kafka Streams if you have not done so.
+    </p>
+
+    <h4><a id="tutorial_maven_setup" href="#tutorial_maven_setup">Setting up a Maven Project</a></h4>
+
+    <p>
+        We are going to use a Kafka Streams Maven Archetype for creating a Streams project structure with the following commands:
+    </p>
+
+    <pre class="brush: bash;">
+        mvn archetype:generate \
+            -DarchetypeGroupId=org.apache.kafka \
+            -DarchetypeArtifactId=streams-quickstart-java \
+            -DarchetypeVersion={{fullDotVersion}} \
+            -DgroupId=streams.examples \
+            -DartifactId=streams.examples \
+            -Dversion=0.1 \
+            -Dpackage=myapps
+    </pre>
+
+    <p>
+        You can use a different value for <code>groupId</code>, <code>artifactId</code> and <code>package</code> parameters if you like.
+        Assuming the above parameter values are used, this command will create a project structure that looks like this:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; tree streams.examples
+        streams-quickstart
+        |-- pom.xml
+        |-- src
+            |-- main
+                |-- java
+                |   |-- myapps
+                |       |-- LineSplit.java
+                |       |-- Pipe.java
+                |       |-- WordCount.java
+                |-- resources
+                    |-- log4j.properties
+    </pre>
+
+    <p>
+        The <code>pom.xml</code> file included in the project already has the Streams dependency defined,
+        and there are already several example programs written with Streams library under <code>src/main/java</code>.
+        Since we are going to start writing such programs from scratch, we can now delete these examples:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; cd streams-quickstart
+        &gt; rm src/main/java/myapps/*.java
+    </pre>
+
+    <h4><a id="tutorial_code_pipe" href="#tutorial_code_pipe">Writing a first Streams application: Pipe</a></h4>
+
+    It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under <code>src/main/java</code>.
+    Let's name it <code>Pipe.java</code>:
+
+    <pre class="brush: java;">
+        package myapps;
+
+        public class Pipe {
+
+            public static void main(String[] args) throws Exception {
+
+            }
+        }
+    </pre>
+
+    <p>
+        We are going to fill in the <code>main</code> function to write this pipe program. Note that we will not list the import statements as we go since IDEs can usually add them automatically.
+        However if you are using a text editor you need to manually add the imports, and at the end of this section we'll show the complete code snippet with import statement for you.
+    </p>
+
+    <p>
+        The first step to write a Streams application is to create a <code>java.util.Properties</code> map to specify different Streams execution configuration values as defined in <code>StreamsConfig</code>.
+        A couple of important configuration values you need to set are: <code>StreamsConfig.BOOTSTRAP_SERVERS_CONFIG</code>, which specifies a list of host/port pairs to use for establishing the initial connection to the Kafka cluster,
+        and <code>StreamsConfig.APPLICATION_ID_CONFIG</code>, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster:
+    </p>
+
+    <pre class="brush: java;">
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
+    </pre>
+
+    <p>
+        In addition, you can customize other configurations in the same map, for example, default serialization and deserialization libraries for the record key-value pairs:
+    </p>
+
+    <pre class="brush: java;">
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+    </pre>
+
+    <p>
+        For a full list of configurations of Kafka Streams please refer to this <a href="/{{version}}/documentation/#streamsconfigs">table</a>.
+    </p>
+
+    <p>
+        Next we will define the computational logic of our Streams application.
+        In Kafka Streams this computational logic is defined as a <code>topology</code> of connected processor nodes.
+        We can use a topology builder to construct such a topology,
+    </p>
+
+    <pre class="brush: java;">
+        final StreamsBuilder builder = new StreamsBuilder();
+    </pre>
+
+    <p>
+        And then create a source stream from a Kafka topic named <code>streams-plaintext-input</code> using this topology builder:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+    </pre>
+
+    <p>
+        Now we get a <code>KStream</code> that is continuously generating records from its source Kafka topic <code>streams-plaintext-input</code>.
+        The records are organized as <code>String</code> typed key-value pairs.
+        The simplest thing we can do with this stream is to write it into another Kafka topic, say it's named <code>streams-pipe-output</code>:
+    </p>
+
+    <pre class="brush: java;">
+        source.to("streams-pipe-output");
+    </pre>
+
+    <p>
+        Note that we can also concatenate the above two lines into a single line as:
+    </p>
+
+    <pre class="brush: java;">
+        builder.stream("streams-plaintext-input").to("streams-pipe-output");
+    </pre>
+
+    <p>
+        We can inspect what kind of <code>topology</code> is created from this builder by doing the following:
+    </p>
+
+    <pre class="brush: java;">
+        final Topology topology = builder.build();
+    </pre>
+
+    <p>
+        And print its description to standard output as:
+    </p>
+
+    <pre class="brush: java;">
+        System.out.println(topology.describe());
+    </pre>
+
+    <p>
+        If we just stop here, compile and run the program, it will output the following information:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.Pipe
+        Sub-topologies:
+          Sub-topology: 0
+            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
+            Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
+        Global Stores:
+          none
+    </pre>
+
+    <p>
+        As shown above, it illustrates that the constructed topology has two processor nodes, a source node <code>KSTREAM-SOURCE-0000000000</code> and a sink node <code>KSTREAM-SINK-0000000001</code>.
+        <code>KSTREAM-SOURCE-0000000000</code> continuously read records from Kafka topic <code>streams-plaintext-input</code> and pipe them to its downstream node <code>KSTREAM-SINK-0000000001</code>;
+        <code>KSTREAM-SINK-0000000001</code> will write each of its received record in order to another Kafka topic <code>streams-pipe-output</code>
+        (the <code>--&gt;</code> and <code>&lt;--</code> arrows dictates the downstream and upstream processor nodes of this node, i.e. "children" and "parents" within the topology graph).
+        It also illustrates that this simple topology has no global state stores associated with it (we will talk about state stores more in the following sections).
+    </p>
+
+    <p>
+        Note that we can always describe the topology as we did above at any given point while we are building it in the code, so as a user you can interactively "try and taste" your computational logic defined in the topology until you are happy with it.
+        Suppose we are already done with this simple topology that just pipes data from one Kafka topic to another in an endless streaming manner,
+        we can now construct the Streams client with the two components we have just constructed above: the configuration map and the topology object
+        (one can also construct a <code>StreamsConfig</code> object from the <code>props</code> map and then pass that object to the constructor,
+        <code>KafkaStreams</code> have overloaded constructor functions to takes either type).
+    </p>
+
+    <pre class="brush: java;">
+        final KafkaStreams streams = new KafkaStreams(topology, props);
+    </pre>
+
+    <p>
+        By calling its <code>start()</code> function we can trigger the execution of this client.
+        The execution won't stop until <code>close()</code> is called on this client.
+        We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:
+    </p>
+
+    <pre class="brush: java;">
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            System.exit(1);
+        }
+        System.exit(0);
+    </pre>
+
+    <p>
+        The complete code so far looks like this:
+    </p>
+
+    <pre class="brush: java;">
+        package myapps;
+
+        import org.apache.kafka.common.serialization.Serdes;
+        import org.apache.kafka.streams.KafkaStreams;
+        import org.apache.kafka.streams.StreamsBuilder;
+        import org.apache.kafka.streams.StreamsConfig;
+        import org.apache.kafka.streams.Topology;
+
+        import java.util.Properties;
+        import java.util.concurrent.CountDownLatch;
+
+        public class Pipe {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
+                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+                final StreamsBuilder builder = new StreamsBuilder();
+
+                builder.stream("streams-plaintext-input").to("streams-pipe-output");
+
+                final Topology topology = builder.build();
+
+                final KafkaStreams streams = new KafkaStreams(topology, props);
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                // attach shutdown handler to catch control-c
+                Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
+                    @Override
+                    public void run() {
+                        streams.close();
+                        latch.countDown();
+                    }
+                });
+
+                try {
+                    streams.start();
+                    latch.await();
+                } catch (Throwable e) {
+                    System.exit(1);
+                }
+                System.exit(0);
+            }
+        }
+    </pre>
+
+    <p>
+        If you already have the Kafka broker up and running at <code>localhost:9092</code>,
+        and the topics <code>streams-plaintext-input</code> and <code>streams-pipe-output</code> created on that broker,
+        you can run this code in your IDE or on the command line, using Maven:
+    </p>
+
+    <pre class="brush: brush;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.Pipe
+    </pre>
+
+    <p>
+        For detailed instructions on how to run a Streams application and observe its computing results,
+        please read the <a href="/{{version}}/documentation/streams/quickstart">Play with a Streams Application</a> section.
+        We will not talk about this in the rest of this section.
+    </p>
+
+    <h4><a id="tutorial_code_linesplit" href="#tutorial_code_linesplit">Writing a second Streams application: Line Split</a></h4>
+
+    <p>
+        We have learned how to construct a Streams client with its two key components: the <code>StreamsConfig</code> and <code>Topology</code>.
+        Now let's move on to add some real processing logic by augmenting the current topology.
+        We can first create another program by first copy the existing <code>Pipe.java</code> class:
+    </p>
+
+    <pre class="brush: brush;">
+        &gt; cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java
+    </pre>
+
+    <p>
+        And change its class name as well as the application id config to distinguish with the original program:
+    </p>
+
+    <pre class="brush: java;">
+        public class LineSplit {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
+                // ...
+            }
+        }
+    </pre>
+
+    <p>
+        Since each of the source stream's record is a <code>String</code> typed key-value pair,
+        let's treat the value string as a text line and split it into words with a <code>FlatMapValues</code> operator:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+        KStream&lt;String, String&gt; words = source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+                    @Override
+                    public Iterable&lt;String&gt; apply(String value) {
+                        return Arrays.asList(value.split("\\W+"));
+                    }
+                });
+    </pre>
+
+    <p>
+        The operator will take the <code>source</code> stream as its input, and generate a new stream named <code>words</code>
+        by processing each record from its source stream in order and breaking its value string into a list of words, and producing
+        each word as a new record to the output <code>words</code> stream.
+        This is a stateless operator that does not need to keep track of any previously received records or processed results.
+        Note if you are using JDK 8 you can use lambda expression and simplify the above code as:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+        KStream&lt;String, String&gt; words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
+    </pre>
+
+    <p>
+        And finally we can write the word stream back into another Kafka topic, say <code>streams-linesplit-output</code>.
+        Again, these two steps can be concatenated as the following (assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
+              .to("streams-linesplit-output");
+    </pre>
+
+    <p>
+        If we now describe this augmented topology as <code>System.out.println(topology.describe())</code>, we will get the following:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.LineSplit
+        Sub-topologies:
+          Sub-topology: 0
+            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
+            Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000
+            Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001
+          Global Stores:
+            none
+    </pre>
+
+    <p>
+        As we can see above, a new processor node <code>KSTREAM-FLATMAPVALUES-0000000001</code> is injected into the topology between the original source and sink nodes.
+        It takes the source node as its parent and the sink node as its child.
+        In other words, each record fetched by the source node will first traverse to the newly added <code>KSTREAM-FLATMAPVALUES-0000000001</code> node to be processed,
+        and one or more new records will be generated as a result. They will continue traverse down to the sink node to be written back to Kafka.
+        Note this processor node is "stateless" as it is not associated with any stores (i.e. <code>(stores: [])</code>).
+    </p>
+
+    <p>
+        The complete code looks like this (assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        package myapps;
+
+        import org.apache.kafka.common.serialization.Serdes;
+        import org.apache.kafka.streams.KafkaStreams;
+        import org.apache.kafka.streams.StreamsBuilder;
+        import org.apache.kafka.streams.StreamsConfig;
+        import org.apache.kafka.streams.Topology;
+        import org.apache.kafka.streams.kstream.KStream;
+
+        import java.util.Arrays;
+        import java.util.Properties;
+        import java.util.concurrent.CountDownLatch;
+
+        public class LineSplit {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
+                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+                final StreamsBuilder builder = new StreamsBuilder();
+
+                KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+                source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
+                      .to("streams-linesplit-output");
+
+                final Topology topology = builder.build();
+                final KafkaStreams streams = new KafkaStreams(topology, props);
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                // ... same as Pipe.java above
+            }
+        }
+    </pre>
+
+    <h4><a id="tutorial_code_wordcount" href="#tutorial_code_wordcount">Writing a third Streams application: Wordcount</a></h4>
+
+    <p>
+        Let's now take a step further to add some "stateful" computations to the topology by counting the occurrence of the words split from the source text stream.
+        Following similar steps let's create another program based on the <code>LineSplit.java</code> class:
+    </p>
+
+    <pre class="brush: java;">
+        public class WordCount {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
+                // ...
+            }
+        }
+    </pre>
+
+    <p>
+        In order to count the words we can first modify the <code>flatMapValues</code> operator to treat all of them as lower case (assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+                    @Override
+                    public Iterable&lt;String&gt; apply(String value) {
+                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    }
+                });
+    </pre>
+
+    <p>
+        In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a <code>groupBy</code> operator.
+        This operator generate a new grouped stream, which can then be aggregated by a <code>count</code> operator, which generates a running count on each of the grouped keys:
+    </p>
+
+    <pre class="brush: java;">
+        KTable&lt;String, Long&gt; counts =
+        source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+                    @Override
+                    public Iterable&lt;String&gt; apply(String value) {
+                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    }
+                })
+              .groupBy(new KeyValueMapper&lt;String, String, String&gt;() {
+                   @Override
+                   public String apply(String key, String value) {
+                       return value;
+                   }
+                })
+              // Materialize the result into a KeyValueStore named "counts-store".
+              // The Materialized store is always of type &lt;Bytes, byte[]&gt; as this is the format of the inner most store.
+              .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt; as("counts-store"));
+    </pre>
+
+    <p>
+        Note that the <code>count</code> operator has a <code>Materialized</code> parameter that specifies that the
+        running count should be stored in a state store named <code>counts-store</code>.
+        This <code>Counts</code> store can be queried in real-time, with details described in the <a href="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer Manual</a>.
+    </p>
+
+    <p>
+        We can also write the <code>counts</code> KTable's changelog stream back into another Kafka topic, say <code>streams-wordcount-output</code>.
+        Because the result is a changelog stream, the output topic <code>streams-wordcount-output</code> should be configured with log compaction enabled.
+        Note that this time the value type is no longer <code>String</code> but <code>Long</code>, so the default serialization classes are not viable for writing it to Kafka anymore.
+        We need to provide overridden serialization methods for <code>Long</code> types, otherwise a runtime exception will be thrown:
+    </p>
+
+    <pre class="brush: java;">
+        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
+    </pre>
+
+    <p>
+        Note that in order to read the changelog stream from topic <code>streams-wordcount-output</code>,
+        one needs to set the value deserialization as <code>org.apache.kafka.common.serialization.LongDeserializer</code>.
+        Details of this can be found in the <a href="/{{version}}/documentation/streams/quickstart">Play with a Streams Application</a> section.
+        Assuming lambda expression from JDK 8 can be used, the above code can be simplified as:
+    </p>
+
+    <pre class="brush: java;">
+        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+              .groupBy((key, value) -> value)
+              .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"))
+              .toStream()
+              .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
+    </pre>
+
+    <p>
+        If we again describe this augmented topology as <code>System.out.println(topology.describe())</code>, we will get the following:
+    </p>
+
+    <pre class="brush: bash;">
+        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.WordCount
+        Sub-topologies:
+          Sub-topology: 0
+            Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
+            Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
+            Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
+            Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
+            Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005
+          Sub-topology: 1
+            Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003
+            Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
+            Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
+            Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
+        Global Stores:
+          none
+    </pre>
+
+    <p>
+        As we can see above, the topology now contains two disconnected sub-topologies.
+        The first sub-topology's sink node <code>KSTREAM-SINK-0000000004</code> will write to a repartition topic <code>Counts-repartition</code>,
+        which will be read by the second sub-topology's source node <code>KSTREAM-SOURCE-0000000006</code>.
+        The repartition topic is used to "shuffle" the source stream by its aggregation key, which is in this case the value string.
+        In addition, inside the first sub-topology a stateless <code>KSTREAM-FILTER-0000000005</code> node is injected between the grouping <code>KSTREAM-KEY-SELECT-0000000002</code> node and the sink node to filter out any intermediate record whose aggregate key is empty.
+    </p>
+    <p>
+        In the second sub-topology, the aggregation node <code>KSTREAM-AGGREGATE-0000000003</code> is associated with a state store named <code>Counts</code> (the name is specified by the user in the <code>count</code> operator).
+        Upon receiving each record from its upcoming stream source node, the aggregation processor will first query its associated <code>Counts</code> store to get the current count for that key, augment by one, and then write the new count back to the store.
+        Each updated count for the key will also be piped downstream to the <code>KTABLE-TOSTREAM-0000000007</code> node, which interpret this update stream as a record stream before further piping to the sink node <code>KSTREAM-SINK-0000000008</code> for writing back to Kafka.
+    </p>
+
+    <p>
+        The complete code looks like this (assuming lambda expression is used):
+    </p>
+
+    <pre class="brush: java;">
+        package myapps;
+
+        import org.apache.kafka.common.serialization.Serdes;
+        import org.apache.kafka.streams.KafkaStreams;
+        import org.apache.kafka.streams.StreamsBuilder;
+        import org.apache.kafka.streams.StreamsConfig;
+        import org.apache.kafka.streams.Topology;
+        import org.apache.kafka.streams.kstream.KStream;
+
+        import java.util.Arrays;
+        import java.util.Locale;
+        import java.util.Properties;
+        import java.util.concurrent.CountDownLatch;
+
+        public class WordCount {
+
+            public static void main(String[] args) throws Exception {
+                Properties props = new Properties();
+                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
+                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+                final StreamsBuilder builder = new StreamsBuilder();
+
+                KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+                source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+                      .groupBy((key, value) -> value)
+                      .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"))
+                      .toStream()
+                      .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
+
+                final Topology topology = builder.build();
+                final KafkaStreams streams = new KafkaStreams(topology, props);
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                // ... same as Pipe.java above
+            }
+        }
+    </pre>
+
+    <div class="pagination">
+        <a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__prev">Previous</a>
+        <a href="/{{version}}/documentation/streams/developer-guide" class="pagination__btn pagination__btn__next">Next</a>
+    </div>
+</script>
+
+<div class="p-quickstart-streams"></div>
+
+<!--#include virtual="../../includes/_header.htm" -->
+<!--#include virtual="../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+    <!--#include virtual="../../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <ul class="breadcrumbs">
+            <li><a href="/documentation">Documentation</a></li>
+            <li><a href="/documentation/streams">Kafka Streams</a></li>
+        </ul>
+        <div class="p-content"></div>
+    </div>
+</div>
+<!--#include virtual="../../includes/_footer.htm" -->
+<script>
+$(function() {
+  // Show selected style on nav item
+  $('.b-nav__streams').addClass('selected');
+
+     //sticky secondary nav
+          var $navbar = $(".sub-nav-sticky"),
+               y_pos = $navbar.offset().top,
+               height = $navbar.height();
+       
+           $(window).scroll(function() {
+               var scrollTop = $(window).scrollTop();
+           
+               if (scrollTop > y_pos - height) {
+                   $navbar.addClass("navbar-fixed")
+               } else if (scrollTop <= y_pos) {
+                   $navbar.removeClass("navbar-fixed")
+               }
+           });
+
+  // Display docs subnav items
+  $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+});
+</script>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/10/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/10/upgrade-guide.html b/10/upgrade-guide.html
new file mode 100644
index 0000000..d1726a7
--- /dev/null
+++ b/10/upgrade-guide.html
@@ -0,0 +1,390 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+    <h1>Upgrade Guide &amp; API Changes</h1>
+
+    <p>
+        If you want to upgrade from 0.11.0.x to 1.0.0 you don't need to do any code changes as the public API is fully backward compatible.
+        However, some public APIs were deprecated and thus it is recommended to update your code eventually to allow for future upgrades.
+        See <a href="#streams_api_changes_100">below</a> a complete list of 1.0.0 API and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+    </p>
+
+    <p>
+        If you want to upgrade from 0.10.2.x to 0.11.0 you don't need to do any code changes as the public API is fully backward compatible.
+        However, some configuration parameters were deprecated and thus it is recommended to update your code eventually to allow for future upgrades.
+        See <a href="#streams_api_changes_0110">below</a> a complete list of 0.11.0 API and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+    </p>
+
+    <p>
+        If you want to upgrade from 0.10.1.x to 0.10.2, see the <a href="/{{version}}/documentation/#upgrade_1020_streams"><b>Upgrade Section for 0.10.2</b></a>.
+        It highlights incompatible changes you need to consider to upgrade your code and application.
+        See <a href="#streams_api_changes_0102">below</a> a complete list of 0.10.2 API and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+    </p>
+
+    <p>
+        If you want to upgrade from 0.10.0.x to 0.10.1, see the <a href="/{{version}}/documentation/#upgrade_1010_streams"><b>Upgrade Section for 0.10.1</b></a>.
+        It highlights incompatible changes you need to consider to upgrade your code and application.
+        See <a href="#streams_api_changes_0101">below</a> a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+    </p>
+
+    <h3><a id="streams_api_changes_100" href="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>
+
+    <p>
+        With 1.0 a major API refactoring was accomplished and the new API is cleaner and easier to use.
+        This change includes the five main classes <code>KafkaStreams</code>, <code>KStreamBuilder</code>,
+        <code>KStream</code>, <code>KTable</code>, and <code>TopologyBuilder</code> (and some more others).
+        All changes are fully backward compatible as old API is only deprecated but not removed.
+        We recommend to move to the new API as soon as you can.
+        We will summarize all API changes in the next paragraphs.
+    </p>
+
+    <p>
+        The two main classes to specify a topology via the DSL (<code>KStreamBuilder</code>)
+        or the Processor API (<code>TopologyBuilder</code>) were deprecated and replaced by
+        <code>StreamsBuilder</code> and <code>Topology</code> (both new classes are located in
+        package <code>org.apache.kafka.streams</code>).
+        Note, that <code>StreamsBuilder</code> does not extend <code>Topology</code>, i.e.,
+        the class hierarchy is different now.
+        The new classes have basically the same methods as the old ones to build a topology via DSL or Processor API.
+        However, some internal methods that were public in <code>KStreamBuilder</code>
+        and <code>TopologyBuilder</code> but not part of the actual API are not present
+        in the new classes any longer.
+        Furthermore, some overloads were simplified compared to the original classes.
+        See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API">KIP-120</a>
+        and <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
+        for full details.
+    </p>
+
+    <p>
+        Changing how a topology is specified also affects <code>KafkaStreams</code> constructors,
+        that now only accept a <code>Topology</code>.
+        Using the DSL builder class <code>StreamsBuilder</code> one can get the constructed
+        <code>Topology</code> via <code>StreamsBuilder#build()</code>.
+        Additionally, a new class <code>org.apache.kafka.streams.TopologyDescription</code>
+        (and some more dependent classes) were added.
+        Those can be used to get a detailed description of the specified topology
+        and can be obtained by calling <code>Topology#describe()</code>.
+        An example using this new API is shown in the <a href="/{{version}}/documentation/streams/quickstart">quickstart section</a>.
+    </p>
+
+    <p>
+        New methods in <code>KStream</code>:
+    </p>
+    <ul>
+        <li>With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-202+Move+merge%28%29+from+StreamsBuilder+to+KStream">KIP-202</a>
+            a new method <code>merge()</code> has been created in <code>KStream</code> as the StreamsBuilder class's <code>StreamsBuilder#merge()</code> has been removed.
+            The method signature was also changed, too: instead of providing multiple <code>KStream</code>s into the method at the once, only a single <code>KStream</code> is accepted.
+        </li>
+    </ul>
+
+    <p>
+        New methods in <code>KafkaStreams</code>:
+    </p>
+    <ul>
+        <li>retrieve the current runtime information about the local threads via <code>#localThreadsMetadata()</code> </li>
+        <li>observe the restoration of all state stores via <code>#setGlobalStateRestoreListener()</code>, in which users can provide their customized implementation of the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface</li>
+    </ul>
+
+    <p>
+        Deprecated / modified methods in <code>KafkaStreams</code>:
+    </p>
+    <ul>
+        <li>
+            <code>toString()</code>, <code>toString(final String indent)</code> were previously used to return static and runtime information.
+            They have been deprecated in favor of using the new classes/methods <code>#localThreadsMetadata()</code> / <code>ThreadMetadata</code> (returning runtime information) and
+            <code>TopologyDescription</code> / <code>Topology#describe()</code> (returning static information).
+        </li>
+        <li>
+            With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
+            you should no longer pass in <code>Serde</code> to <code>KStream#print</code> operations.
+            If you can't rely on using <code>toString</code> to print your keys an values, you should instead you provide a custom <code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> call.
+        </li>
+        <li>
+            <code>setStateListener()</code> now can only be set before the application start running, i.e. before <code>KafkaStreams.start()</code> is called.
+        </li>
+    </ul>
+
+    <p>
+        Deprecated methods in <code>KGroupedStream</code>
+    </p>
+    <ul>
+        <li>
+            Windowed aggregations have been deprecated from <code>KGroupedStream</code> and moved to <code>WindowedKStream</code>.
+            You can now perform a windowed aggregation by, for example, using <code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>.
+        </li>
+    </ul>
+
+    <p>
+        Modified methods in <code>Processor</code>:
+    </p>
+    <ul>
+        <li>
+            <p>
+                The Processor API was extended to allow users to schedule <code>punctuate</code> functions either based on data-driven <b>stream time</b> or wall-clock time.
+                As a result, the original <code>ProcessorContext#schedule</code> is deprecated with a new overloaded function that accepts a user customizable <code>Punctuator</code> callback interface, which triggers its <code>punctuate</code> API method periodically based on the <code>PunctuationType</code>.
+                The <code>PunctuationType</code> determines what notion of time is used for the punctuation scheduling: either <a href="/{{version}}/documentation/streams/core-concepts#streams_time">stream time</a> or wall-clock time (by default, <b>stream time</b> is configured to represent event time via <code>TimestampExtractor</code>).
+                In addition, the <code>punctuate</code> function inside <code>Processor</code> is also deprecated.
+            </p>
+            <p>
+                Before this, users could only schedule based on stream time (i.e. <code>PunctuationType.STREAM_TIME</code>) and hence the <code>punctuate</code> function was data-driven only because stream time is determined (and advanced forward) by the timestamps derived from the input data.
+                If there is no data arriving at the processor, the stream time would not advance and hence punctuation will not be triggered.
+                On the other hand, When wall-clock time (i.e. <code>PunctuationType.WALL_CLOCK_TIME</code>) is used, <code>punctuate</code> will be triggered purely based on wall-clock time.
+                So for example if the <code>Punctuator</code> function is scheduled based on <code>PunctuationType.WALL_CLOCK_TIME</code>, if these 60 records were processed within 20 seconds,
+                <code>punctuate</code> would be called 2 times (one time every 10 seconds);
+                if these 60 records were processed within 5 seconds, then no <code>punctuate</code> would be called at all.
+                Users can schedule multiple <code>Punctuator</code> callbacks with different <code>PunctuationType</code>s within the same processor by simply calling <code>ProcessorContext#schedule</code> multiple times inside processor's <code>init()</code> method.
+            </p>
+        </li>
+    </ul>
+
+    <p>
+        If you are monitoring on task level or processor-node / state store level Streams metrics, please note that the metrics sensor name and hierarchy was changed:
+        The task ids, store names and processor names are no longer in the sensor metrics names, but instead are added as tags of the sensors to achieve consistent metrics hierarchy.
+        As a result you may need to make corresponding code changes on your metrics reporting and monitoring tools when upgrading to 1.0.0.
+        Detailed metrics sensor can be found in the <a href="#kafka_streams_monitoring">Streams Monitoring</a> section.
+    </p>
+
+    <p>
+        The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers">KIP-161</a>
+        enables you to provide a default exception handler for deserialization errors when reading data from Kafka rather than throwing the exception all the way out of your streams application.
+        You can provide the configs via the <code>StreamsConfig</code> as <code>StreamsConfig#DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG</code>.
+        The specified handler must implement the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.
+    </p>
+
+    <p>
+        The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+topic+configs">KIP-173</a>
+        enables you to provide topic configuration parameters for any topics created by Kafka Streams.
+        This includes repartition and changelog topics.
+        You can provide the configs via the <code>StreamsConfig</code> by adding the configs with the prefix as defined by <code>StreamsConfig#topicPrefix(String)</code>.
+        Any properties in the <code>StreamsConfig</code> with the prefix will be applied when creating internal topics.
+        Any configs that aren't topic configs will be ignored.
+        If you already use <code>StateStoreSupplier</code> or <code>Materialized</code> to provide configs for changelogs, then they will take precedence over those supplied in the config.
+    </p>
+
+    <h3><a id="streams_api_changes_0110" href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
+
+    <p> Updates in <code>StreamsConfig</code>: </p>
+    <ul>
+        <li> new configuration parameter <code>processing.guarantee</code> is added </li>
+        <li> configuration parameter <code>key.serde</code> was deprecated and replaced by <code>default.key.serde</code> </li>
+        <li> configuration parameter <code>value.serde</code> was deprecated and replaced by <code>default.value.serde</code> </li>
+        <li> configuration parameter <code>timestamp.extractor</code> was deprecated and replaced by <code>default.timestamp.extractor</code> </li>
+        <li> method <code>#keySerde()</code> was deprecated and replaced by <code>#defaultKeySerde()</code> </li>
+        <li> method <code>#valueSerde()</code> was deprecated and replaced by <code>#defaultValueSerde()</code> </li>
+        <li> new method <code>#defaultTimestampExtractor()</code> was added </li>
+    </ul>
+
+    <p> New methods in <code>TopologyBuilder</code>: </p>
+    <ul>
+        <li> added overloads for <code>#addSource()</code> that allow to define a <code>TimestampExtractor</code> per source node </li>
+        <li> added overloads for <code>#addGlobalStore()</code> that allow to define a <code>TimestampExtractor</code> per source node associated with the global store </li>
+    </ul>
+
+    <p> New methods in <code>KStreamBuilder</code>: </p>
+    <ul>
+        <li> added overloads for <code>#stream()</code> that allow to define a <code>TimestampExtractor</code> per input stream </li>
+        <li> added overloads for <code>#table()</code> that allow to define a <code>TimestampExtractor</code> per input table </li>
+        <li> added overloads for <code>#globalKTable()</code> that allow to define a <code>TimestampExtractor</code> per global table </li>
+    </ul>
+
+    <p> Deprecated methods in <code>KTable</code>: </p>
+    <ul>
+        <li> <code>void foreach(final ForeachAction&lt;? super K, ? super V&gt; action)</code> </li>
+        <li> <code>void print()</code> </li>
+        <li> <code>void print(final String streamName)</code> </li>
+        <li> <code>void print(final Serde&lt;K&gt; keySerde, final Serde&lt;V&gt; valSerde)</code> </li>
+        <li> <code>void print(final Serde&lt;K&gt; keySerde, final Serde&lt;V&gt; valSerde, final String streamName)</code> </li>
+        <li> <code>void writeAsText(final String filePath)</code> </li>
+        <li> <code>void writeAsText(final String filePath, final String streamName)</code> </li>
+        <li> <code>void writeAsText(final String filePath, final Serde&lt;K&gt; keySerde, final Serde&lt;V&gt; valSerde)</code> </li>
+        <li> <code>void writeAsText(final String filePath, final String streamName, final Serde&lt;K&gt; keySerde, final Serde&lt;V&gt; valSerde)</code> </li>
+    </ul>
+
+    <p>
+        The above methods have been deprecated in favor of using the Interactive Queries API.
+        If you want to query the current content of the state store backing the KTable, use the following approach:
+    </p>
+    <ul>
+        <li> Make a call to <code>KafkaStreams.store(final String storeName, final QueryableStoreType&lt;T&gt; queryableStoreType)</code> </li>
+        <li> Then make a call to <code>ReadOnlyKeyValueStore.all()</code> to iterate over the keys of a <code>KTable</code>. </li>
+    </ul>
+    <p>
+        If you want to view the changelog stream of the <code>KTable</code> then you could call <code>KTable.toStream().print(Printed.toSysOut)</code>.
+    </p>
+
+    <p> Metrics using exactly-once semantics: </p>
+    <p>
+        If exactly-once processing is enabled via the <code>processing.guarantees</code> parameter, internally Streams switches from a producer per thread to a producer per task runtime model.
+        In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case.
+        Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics.
+    
+    </p>
+
+    <p> Producer's <code>client.id</code> naming schema: </p>
+    <ul>
+        <li> at-least-once (default): <code>[client.Id]-StreamThread-[sequence-number]</code> </li>
+        <li> exactly-once: <code>[client.Id]-StreamThread-[sequence-number]-[taskId]</code> </li>
+    </ul>
+    <p> <code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id</code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p>
+
+    <h3><a id="streams_api_changes_01021" href="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3>
+
+    <p>
+        Parameter updates in <code>StreamsConfig</code>:
+    </p>
+    <ul>
+        <li> The default config values of embedded producer's <code>retries</code> and consumer's <code>max.poll.interval.ms</code> have been changed to improve the resiliency of a Kafka Streams application </li>
+    </ul>
+
+    <h3><a id="streams_api_changes_0102" href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
+
+    <p>
+        New methods in <code>KafkaStreams</code>:
+    </p>
+    <ul>
+        <li> set a listener to react on application state change via <code>#setStateListener(StateListener listener)</code> </li>
+        <li> retrieve the current application state via <code>#state()</code> </li>
+        <li> retrieve the global metrics registry via <code>#metrics()</code> </li>
+        <li> apply a timeout when closing an application via <code>#close(long timeout, TimeUnit timeUnit)</code> </li>
+        <li> specify a custom indent when retrieving Kafka Streams information via <code>#toString(String indent)</code> </li>
+    </ul>
+
+    <p>
+        Parameter updates in <code>StreamsConfig</code>:
+    </p>
+    <ul>
+        <li> parameter <code>zookeeper.connect</code> was deprecated; a Kafka Streams application does no longer interact with ZooKeeper for topic management but uses the new broker admin protocol
+            (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-TopicAdminSchema.1">KIP-4, Section "Topic Admin Schema"</a>) </li>
+        <li> added many new parameters for metrics, security, and client configurations </li>
+    </ul>
+
+    <p> Changes in <code>StreamsMetrics</code> interface: </p>
+    <ul>
+        <li> removed methods: <code>#addLatencySensor()</code> </li>
+        <li> added methods: <code>#addLatencyAndThroughputSensor()</code>, <code>#addThroughputSensor()</code>, <code>#recordThroughput()</code>,
+            <code>#addSensor()</code>, <code>#removeSensor()</code> </li>
+    </ul>
+
+    <p> New methods in <code>TopologyBuilder</code>: </p>
+    <ul>
+        <li> added overloads for <code>#addSource()</code> that allow to define a <code>auto.offset.reset</code> policy per source node </li>
+        <li> added methods <code>#addGlobalStore()</code> to add global <code>StateStore</code>s </li>
+    </ul>
+
+    <p> New methods in <code>KStreamBuilder</code>: </p>
+    <ul>
+        <li> added overloads for <code>#stream()</code> and <code>#table()</code> that allow to define a <code>auto.offset.reset</code> policy per input stream/table </li>
+        <li> added method <code>#globalKTable()</code> to create a <code>GlobalKTable</code> </li>
+    </ul>
+
+    <p> New joins for <code>KStream</code>: </p>
+    <ul>
+        <li> added overloads for <code>#join()</code> to join with <code>KTable</code> </li>
+        <li> added overloads for <code>#join()</code> and <code>leftJoin()</code> to join with <code>GlobalKTable</code> </li>
+        <li> note, join semantics in 0.10.2 were improved and thus you might see different result compared to 0.10.0.x and 0.10.1.x
+            (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">Kafka Streams Join Semantics</a> in the Apache Kafka wiki)
+    </ul>
+
+    <p> Aligned <code>null</code>-key handling for <code>KTable</code> joins: </p>
+    <ul>
+        <li> like all other KTable operations, <code>KTable-KTable</code> joins do not throw an exception on <code>null</code> key records anymore, but drop those records silently </li>
+    </ul>
+
+    <p> New window type <em>Session Windows</em>: </p>
+    <ul>
+        <li> added class <code>SessionWindows</code> to specify session windows </li>
+        <li> added overloads for <code>KGroupedStream</code> methods <code>#count()</code>, <code>#reduce()</code>, and <code>#aggregate()</code>
+            to allow session window aggregations </li>
+    </ul>
+
+    <p> Changes to <code>TimestampExtractor</code>: </p>
+    <ul>
+        <li> method <code>#extract()</code> has a second parameter now </li>
+        <li> new default timestamp extractor class <code>FailOnInvalidTimestamp</code>
+            (it gives the same behavior as old (and removed) default extractor <code>ConsumerRecordTimestampExtractor</code>) </li>
+        <li> new alternative timestamp extractor classes <code>LogAndSkipOnInvalidTimestamp</code> and <code>UsePreviousTimeOnInvalidTimestamps</code> </li>
+    </ul>
+
+    <p> Relaxed type constraints of many DSL interfaces, classes, and methods (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API">KIP-100</a>). </p>
+
+    <h3><a id="streams_api_changes_0101" href="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3>
+
+    <p> Stream grouping and aggregation split into two methods: </p>
+    <ul>
+        <li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li>
+        <li> new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(), and #count() </li>
+        <li> Example: stream.countByKey() changes to stream.groupByKey().count() </li>
+    </ul>
+
+    <p> Auto Repartitioning: </p>
+    <ul>
+        <li> a call to through() after a key-changing operator and before an aggregation/join is no longer required </li>
+        <li> Example: stream.selectKey(...).through(...).countByKey() changes to stream.selectKey().groupByKey().count() </li>
+    </ul>
+
+    <p> TopologyBuilder: </p>
+    <ul>
+        <li> methods #sourceTopics(String applicationId) and #topicGroups(String applicationId) got simplified to #sourceTopics() and #topicGroups() </li>
+    </ul>
+
+    <p> DSL: new parameter to specify state store names: </p>
+    <ul>
+        <li> The new Interactive Queries feature requires to specify a store name for all source KTables and window aggregation result KTables (previous parameter "operator/window name" is now the storeName) </li>
+        <li> KStreamBuilder#table(String topic) changes to #topic(String topic, String storeName) </li>
+        <li> KTable#through(String topic) changes to #through(String topic, String storeName) </li>
+        <li> KGroupedStream #aggregate(), #reduce(), and #count() require additional parameter "String storeName"</li>
+        <li> Example: stream.countByKey(TimeWindows.of("windowName", 1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName") </li>
+    </ul>
+
+    <p> Windowing: </p>
+    <ul>
+        <li> Windows are not named anymore: TimeWindows.of("name", 1000) changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names) </li>
+        <li> JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) </li>
+    </ul>
+
+    <div class="pagination">
+        <a href="/{{version}}/documentation/streams/architecture" class="pagination__btn pagination__btn__prev">Previous</a>
+        <a href="#" class="pagination__btn pagination__btn__next pagination__btn--disabled">Next</a>
+    </div>
+</script>
+
+<!--#include virtual="../../includes/_header.htm" -->
+<!--#include virtual="../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+    <!--#include virtual="../../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <ul class="breadcrumbs">
+            <li><a href="/documentation">Documentation</a></li>
+            <li><a href="/documentation/streams">Kafka Streams</a></li>
+        </ul>
+        <div class="p-content"></div>
+    </div>
+</div>
+<!--#include virtual="../../includes/_footer.htm" -->
+<script>
+$(function() {
+  // Show selected style on nav item
+  $('.b-nav__streams').addClass('selected');
+
+  // Display docs subnav items
+  $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+});
+</script>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide.html b/documentation/streams/developer-guide.html
deleted file mode 100644
index 1fa1d01..0000000
--- a/documentation/streams/developer-guide.html
+++ /dev/null
@@ -1,2 +0,0 @@
-<!-- should always link the the latest release's documentation -->
-<!--#include virtual="../../10/streams/developer-guide.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/app-reset-tool.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/app-reset-tool.html b/documentation/streams/developer-guide/app-reset-tool.html
new file mode 100644
index 0000000..bcb542d
--- /dev/null
+++ b/documentation/streams/developer-guide/app-reset-tool.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/app-reset-tool.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/config-streams.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/config-streams.html b/documentation/streams/developer-guide/config-streams.html
new file mode 100644
index 0000000..bcb542d
--- /dev/null
+++ b/documentation/streams/developer-guide/config-streams.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/app-reset-tool.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/datatypes.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/datatypes.html b/documentation/streams/developer-guide/datatypes.html
new file mode 100644
index 0000000..a4444f1
--- /dev/null
+++ b/documentation/streams/developer-guide/datatypes.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/datatypes.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/dsl-api.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/dsl-api.html b/documentation/streams/developer-guide/dsl-api.html
new file mode 100644
index 0000000..4095045
--- /dev/null
+++ b/documentation/streams/developer-guide/dsl-api.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/dsl-api.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/index.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/index.html b/documentation/streams/developer-guide/index.html
new file mode 100644
index 0000000..c8efe57
--- /dev/null
+++ b/documentation/streams/developer-guide/index.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/index.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/interactive-queries.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/interactive-queries.html b/documentation/streams/developer-guide/interactive-queries.html
new file mode 100644
index 0000000..2a011d9
--- /dev/null
+++ b/documentation/streams/developer-guide/interactive-queries.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/interactive-queries.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/manage-topics.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/manage-topics.html b/documentation/streams/developer-guide/manage-topics.html
new file mode 100644
index 0000000..83344d6
--- /dev/null
+++ b/documentation/streams/developer-guide/manage-topics.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/manage-topics.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/memory-mgmt.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/memory-mgmt.html b/documentation/streams/developer-guide/memory-mgmt.html
new file mode 100644
index 0000000..e0323be
--- /dev/null
+++ b/documentation/streams/developer-guide/memory-mgmt.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/memory-mgmt.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/processor-api.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/processor-api.html b/documentation/streams/developer-guide/processor-api.html
new file mode 100644
index 0000000..61c64f4
--- /dev/null
+++ b/documentation/streams/developer-guide/processor-api.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/processor-api.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/running-app.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/running-app.html b/documentation/streams/developer-guide/running-app.html
new file mode 100644
index 0000000..a6759fd
--- /dev/null
+++ b/documentation/streams/developer-guide/running-app.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/running-app.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/security.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/security.html b/documentation/streams/developer-guide/security.html
new file mode 100644
index 0000000..954f259
--- /dev/null
+++ b/documentation/streams/developer-guide/security.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/security.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/documentation/streams/developer-guide/write-streams.html
----------------------------------------------------------------------
diff --git a/documentation/streams/developer-guide/write-streams.html b/documentation/streams/developer-guide/write-streams.html
new file mode 100644
index 0000000..7030bf4
--- /dev/null
+++ b/documentation/streams/developer-guide/write-streams.html
@@ -0,0 +1,2 @@
+<!-- should always link the the latest release's documentation -->
+<!--#include virtual="../../../10/streams/developer-guide/write-streams.html" -->


Mime
View raw message