kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka-site] branch asf-site updated: Webpage updated for 0.11.0.3
Date Tue, 03 Jul 2018 18:15:35 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 05f4cc8  Webpage updated for 0.11.0.3
05f4cc8 is described below

commit 05f4cc8b71c6c28e56df29a0c71db3f9a7911642
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Tue Jul 3 09:22:25 2018 -0700

    Webpage updated for 0.11.0.3
---
 0110/generated/producer_config.html |    2 +-
 0110/generated/streams_config.html  |    4 +-
 0110/introduction.html              |   31 +-
 0110/streams/core-concepts.html     |   28 +-
 0110/streams/developer-guide.html   | 2073 +++--------------------------------
 0110/streams/index.html             |  592 +++++-----
 0110/streams/quickstart.html        |   38 +-
 0110/streams/tutorial.html          |   28 +-
 0110/streams/upgrade-guide.html     |   29 +-
 0110/upgrade.html                   |   67 +-
 0110/uses.html                      |    2 +-
 downloads.html                      |   22 +
 12 files changed, 554 insertions(+), 2362 deletions(-)

diff --git a/0110/generated/producer_config.html b/0110/generated/producer_config.html
index 2fd1891..9a25249 100644
--- a/0110/generated/producer_config.html
+++ b/0110/generated/producer_config.html
@@ -112,5 +112,5 @@
 <tr>
 <td>transaction.timeout.ms</td><td>The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.If this value is larger than the max.transaction.timeout.ms setting in the broker, the request will fail with a `InvalidTransactionTimeout` error.</td><td>int</td><td>60000</td><td></td><td>low</td></tr>
 <tr>
-<td>transactional.id</td><td>The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is em [...]
+<td>transactional.id</td><td>The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is <c [...]
 </tbody></table>
diff --git a/0110/generated/streams_config.html b/0110/generated/streams_config.html
index dad8110..45bfc5e 100644
--- a/0110/generated/streams_config.html
+++ b/0110/generated/streams_config.html
@@ -30,7 +30,7 @@
 <tr>
 <td>num.stream.threads</td><td>The number of threads to execute stream processing.</td><td>int</td><td>1</td><td></td><td>medium</td></tr>
 <tr>
-<td>processing.guarantee</td><td>The processing guarantee that should be used. Possible values are <code>at_least_once</code> (default) and <code>exactly_once</code>.</td><td>string</td><td>at_least_once</td><td>[at_least_once, exactly_once]</td><td>medium</td></tr>
+<td>processing.guarantee</td><td>The processing guarantee that should be used. Possible values are <code>at_least_once</code> (default) and <code>exactly_once</code>. Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting `transaction.state.log.replication.factor`.</td><td>string</td><td>at_least_once</td><td>[at_least_once, exactly_once]</td [...]
 <tr>
 <td>security.protocol</td><td>Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.</td><td>string</td><td>PLAINTEXT</td><td></td><td>medium</td></tr>
 <tr>
@@ -76,6 +76,8 @@
 <tr>
 <td>timestamp.extractor</td><td>Timestamp extractor class that implements the <code>TimestampExtractor</code> interface. This config is deprecated, use <code>default.timestamp.extractor</code> instead</td><td>class</td><td>null</td><td></td><td>low</td></tr>
 <tr>
+<td>upgrade.from</td><td>Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. Default is null. Accepted values are "0.10.0" (for upgrading from 0.10.0.x).</td><td>string</td><td>null</td><td>[null, 0.10.0]</td><td>low</td></tr>
+<tr>
 <td>value.serde</td><td>Serializer / deserializer class for value that implements the <code>Serde</code> interface. This config is deprecated, use <code>default.value.serde</code> instead</td><td>class</td><td>null</td><td></td><td>low</td></tr>
 <tr>
 <td>windowstore.changelog.additional.retention.ms</td><td>Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day</td><td>long</td><td>86400000</td><td></td><td>low</td></tr>
diff --git a/0110/introduction.html b/0110/introduction.html
index 5b3bb4a..e20ae71 100644
--- a/0110/introduction.html
+++ b/0110/introduction.html
@@ -19,22 +19,21 @@
 
 <script id="introduction-template" type="text/x-handlebars-template">
   <h3> Apache Kafka&reg; is <i>a distributed streaming platform</i>. What exactly does that mean?</h3>
-  <p>We think of a streaming platform as having three key capabilities:</p>
-  <ol>
-    <li>It lets you publish and subscribe to streams of records. In this respect it is similar to a message queue or enterprise messaging system.
-    <li>It lets you store streams of records in a fault-tolerant way.
-    <li>It lets you process streams of records as they occur.
-  </ol>
-  <p>What is Kafka good for?</p>
-  <p>It gets used for two broad classes of application:</p>
-  <ol>
+  <p>A streaming platform has three key capabilities:</p>
+  <ul>
+    <li>Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
+    <li>Store streams of records in a fault-tolerant durable way.
+    <li>Process streams of records as they occur.
+  </ul>
+  <p>Kafka is generally used for two broad classes of applications:</p>
+  <ul>
     <li>Building real-time streaming data pipelines that reliably get data between systems or applications
     <li>Building real-time streaming applications that transform or react to the streams of data
-  </ol>
+  </ul>
   <p>To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up.</p>
   <p>First a few concepts:</p>
   <ul>
-    <li>Kafka is run as a cluster on one or more servers.
+    <li>Kafka is run as a cluster on one or more servers that can span multiple datacenters.
       <li>The Kafka cluster stores streams of <i>records</i> in categories called <i>topics</i>.
     <li>Each record consists of a key, a value, and a timestamp.
   </ul>
@@ -60,7 +59,7 @@
   <p> Each partition is an ordered, immutable sequence of records that is continually appended to&mdash;a structured commit log. The records in the partitions are each assigned a sequential id number called the <i>offset</i> that uniquely identifies each record within the partition.
   </p>
   <p>
-  The Kafka cluster retains all published records&mdash;whether or not they have been consumed&mdash;using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so storing data for a long time is not a problem.
+  The Kafka cluster durably persists all published records&mdash;whether or not they have been consumed&mdash;using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so storing data for a long time is not a problem.
   </p>
   <img class="centered" src="/{{version}}/images/log_consumer.png" style="width:400px">
   <p>
@@ -82,6 +81,10 @@
   Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.
   </p>
 
+  <h4><a id="intro_geo-replication" href="#intro_geo-replication">Geo-Replication</a></h4>
+
+  <p>Kafka MirrorMaker provides geo-replication support for your clusters. With MirrorMaker, messages are replicated across multiple datacenters or cloud regions. You can use this in active/passive scenarios for backup and recovery; or in active/active scenarios to place data closer to your users, or support data locality requirements. </p>
+
   <h4><a id="intro_producers" href="#intro_producers">Producers</a></h4>
   <p>
   Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). More on the use of partitioning in a second!
@@ -111,6 +114,8 @@
   <p>
   Kafka only provides a total order over records <i>within</i> a partition, not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over records this can be achieved with a topic that has only one partition, though this will mean only one consumer process per consumer group.
   </p>
+  <h4><a id="intro_multi-tenancy" href="#intro_multi-tenancy">Multi-tenancy</a></h4>
+  <p>You can deploy Kafka as a multi-tenant solution. Multi-tenancy is enabled by configuring which topics can produce or consume data. There is also operations support for quotas.  Administrators can define and enforce quotas on requests to control the broker resources that are used by clients.  For more information, see the <a href="https://kafka.apache.org/documentation/#security">security documentation</a>. </p>
   <h4><a id="intro_guarantees" href="#intro_guarantees">Guarantees</a></h4>
   <p>
   At a high-level Kafka gives the following guarantees:
@@ -202,7 +207,7 @@
   Likewise for streaming data pipelines the combination of subscription to real-time events make it possible to use Kafka for very low-latency pipelines; but the ability to store data reliably make it possible to use it for critical data where the delivery of data must be guaranteed or for integration with offline systems that load data only periodically or may go down for extended periods of time for maintenance. The stream processing facilities make it possible to transform data as it  [...]
   </p>
   <p>
-  For more information on the guarantees, APIs, and capabilities Kafka provides see the rest of the <a href="/documentation.html">documentation</a>.
+  For more information on the guarantees, apis, and capabilities Kafka provides see the rest of the <a href="/documentation.html">documentation</a>.
   </p>
 </script>
 
diff --git a/0110/streams/core-concepts.html b/0110/streams/core-concepts.html
index 99c3479..7349c3a 100644
--- a/0110/streams/core-concepts.html
+++ b/0110/streams/core-concepts.html
@@ -19,18 +19,7 @@
 
 <script id="content-template" type="text/x-handlebars-template">
     <h1>Core Concepts</h1>
-    <div class="sub-nav-sticky">
-      <div class="sticky-top">
-        <div style="height:35px">
-          <a href="/{{version}}/documentation/streams/">Introduction</a>
-          <a href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
-          <a class="active-menu-item" href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
-          <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
-          <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
-        </div>
-      </div>
-    </div> 
-    
+
     <p>
         Kafka Streams is a client library for processing and analyzing data stored in Kafka.
         It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management of application state.
@@ -191,20 +180,7 @@
 $(function() {
   // Show selected style on nav item
   $('.b-nav__streams').addClass('selected');
-     //sticky secondary nav
-          var $navbar = $(".sub-nav-sticky"),
-               y_pos = $navbar.offset().top,
-               height = $navbar.height();
-       
-           $(window).scroll(function() {
-               var scrollTop = $(window).scrollTop();
-           
-               if (scrollTop > y_pos - height) {
-                   $navbar.addClass("navbar-fixed")
-               } else if (scrollTop <= y_pos) {
-                   $navbar.removeClass("navbar-fixed")
-               }
-    });
+
   // Display docs subnav items
   $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
 });
diff --git a/0110/streams/developer-guide.html b/0110/streams/developer-guide.html
index b279a84..15298a7 100644
--- a/0110/streams/developer-guide.html
+++ b/0110/streams/developer-guide.html
@@ -18,79 +18,9 @@
 <script><!--#include virtual="../js/templateData.js" --></script>
 
 <script id="content-template" type="text/x-handlebars-template">
-   <h1>Developer Guide for Kafka Streams API</h1> 
-    <div class="sub-nav-sticky">
-      <div class="sticky-top">
-        <div style="height:35px">
-          <a href="/{{version}}/documentation/streams/">Introduction</a>
-          <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
-          <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
-          <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
-          <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
-        </div>
-      </div>
-    </div> 
+    <h1>Developer Manual</h1>
 
     <p>
-        This developer guide describes how to write, configure, and execute a Kafka Streams application. There is a <a href="/{{version}}/documentation/#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library.
-    </p>
-
-    <p>
-        The computational logic of a Kafka Streams application is defined as a <a href="/{{version}}/documentation/streams/core-concepts#streams_topology">processor topology</a>. Kafka Streams provide two sets of APIs to define the processor topology, Low-Level Processor API and High-Level Streams DSL.
-    </p>
-
-    <ul class="toc">
-        <li><a href="#streams_processor">1. Low-level Processor API</a>
-            <ul>
-                <li><a href="#streams_processor_process">1.1 Processor</a>
-                <li><a href="#streams_processor_topology">1.2 Processor Topology</a>
-                <li><a href="#streams_processor_statestore">1.3 State Stores</a>
-                <li><a href="#restoration_progress">1.4 Monitoring the Restoration Progress of Fault-tolerant State Store</a>
-                <li><a href="#disable-changelogs">1.5 Enable / Disable Fault Tolerance of State Stores (Store Changelogs)</a>
-                <li><a href="#implementing-custom-state-stores">1.6 Implementing Custom State Stores</a>
-                <li><a href="#connecting-processors-and-state-stores">1.7 Connecting Processors and State Stores</a>
-                <li><a href="#streams_processor_describe">1.5 Describe a Topology</a>
-            </ul>
-        </li>
-        <li><a href="#streams_dsl">2. High-Level Streams DSL</a>
-            <ul>
-                <li><a href="#streams_duality">2.1 Duality of Streams and Tables</a>
-                <li><a href="#streams_dsl_source">2.2 Creating Source Streams from Kafka</a>
-                <li><a href="#streams_dsl_transform">2.3 Transform a stream</a>
-                <li><a href="#streams_dsl_sink">2.4 Write streams back to Kafka</a>
-                <li><a href="#streams_dsl_build">2.5 Generate the processor topology</a>
-            </ul>
-        </li>
-        <li><a href="#streams_interactive_queries">3. Interactive Queries</a>
-            <ul>
-                <li><a href="#streams_developer-guide_interactive-queries_your_app">3.1 Your application and interactive queries</a>
-                <li><a href="#streams_developer-guide_interactive-queries_local-stores">3.2 Querying local state stores (for an application instance)</a>
-                <li><a href="#streams_developer-guide_interactive-queries_local-key-value-stores">3.3 Querying local key-value stores</a>
-                <li><a href="#streams_developer-guide_interactive-queries_local-window-stores">3.4 Querying local window stores</a>
-                <li><a href="#streams_developer-guide_interactive-queries_custom-stores">3.5 Querying local custom state stores</a>
-                <li><a href="#streams_developer-guide_interactive-queries_discovery">3.6 Querying remote state stores (for the entire application)</a>
-                <li><a href="#streams_developer-guide_interactive-queries_rpc-layer">3.7 Adding an RPC layer to your application</a>
-                <li><a href="#streams_developer-guide_interactive-queries_expose-rpc">3.8 Exposing the RPC endpoints of your application</a>
-                <li><a href="#streams_developer-guide_interactive-queries_discover-app-instances-and-stores">3.9 Discovering and accessing application instances and their respective local state stores</a>
-            </ul>
-        </li>
-        <li><a href="#streams_developer-guide_memory-management">4. Memory Management</a>
-            <ul>
-                <li><a href="#streams_developer-guide_memory-management_record-cache">4.1 Record caches in the DSL</a>
-                <li><a href="#streams_developer-guide_memory-management_state-store-cache">4.2 State store caches in the Processor API</a>
-                <li><a href="#streams_developer-guide_memory-management_other_memory_usage">4.3 Other memory usage</a>
-            </ul>
-        </li>
-        <li><a href="#streams_configure_execute">5. Application Configuration and Execution</a>
-            <ul>
-                <li><a href="#streams_client_config">5.1 Producer and Consumer Configuration</a>
-                <li><a href="#streams_broker_config">5.2 Broker Configuration</a>
-                <li><a href="#streams_topic_config">5.3 Internal Topic Configuration</a>
-                <li><a href="#streams_execute">5.4 Executing Your Kafka Streams Application</a>
-            </ul>
-        </li>
-    </ul>
-    <p>
         There is a <a href="/{{version}}/documentation/#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library.
         This section focuses on how to write, configure, and execute a Kafka Streams application.
     </p>
@@ -105,17 +35,19 @@
     <h4><a id="streams_processor_process" href="#streams_processor_process">Processor</a></h4>
 
     <p>
-        A <a href="/{{version}}/documentation/streams/core-concepts"><b>stream processor</b></a> is a node in the processor topology that represents a single processing step.
-        With the <code>Processor</code> API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with
+        As mentioned in the <a href="/{{version}}/documentation/streams/core-concepts"><b>Core Concepts</b></a> section, a stream processor is a node in the processor topology that represents a single processing step.
+        With the <code>Processor</code> API developers can define arbitrary stream processors that process one received record at a time, and connect these processors with
         their associated state stores to compose the processor topology that represents their customized processing logic.
     </p>
 
     <p>
-        The <code>Processor</code> interface provides the <code>process</code> method API, which is performed on each record that is received.
-        The processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the <code>init</code> method
-        and use the context to schedule a periodically called punctuation function (<code>context().schedule</code>),
-        to forward the new or modified key-value pair to downstream processors (<code>context().forward</code>),
-        to commit the current processing progress (<code>context().commit</code>), and so on.
+        The <code>Processor</code> interface provides two main API methods:
+        <code>process</code> and <code>punctuate</code>. The <code>process</code> method is performed on each
+        of the received record; and the <code>punctuate</code> method is performed periodically based on elapsed time.
+        In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the
+        <code>init</code> method, and use the context to schedule the punctuation period (<code>context().schedule</code>), to
+        forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), to commit the current
+        processing progress (<code>context().commit</code>), etc.
     </p>
 
     <p>
@@ -133,26 +65,8 @@
     // keep the processor context locally because we need it in punctuate() and commit()
     this.context = context;
 
-    // schedule a punctuation method every 1000 milliseconds.
-    this.context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
-        @Override
-        public void punctuate(long timestamp) {
-            KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
-
-            while (iter.hasNext()) {
-                KeyValue&lt;String, Long&gt; entry = iter.next();
-                context.forward(entry.key, entry.value.toString());
-            }
-
-            // it is the caller's responsibility to close the iterator on state store;
-            // otherwise it may lead to memory and file handlers leak depending on the
-            // underlying state store implementation.
-            iter.close();
-
-            // commit the current processing progress
-            context.commit();
-        }
-        });
+    // call this processor's punctuate() method every 1000 milliseconds.
+    this.context.schedule(1000);
 
     // retrieve the key-value store named "Counts"
     this.kvStore = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("Counts");
@@ -182,7 +96,7 @@
         context.forward(entry.key, entry.value.toString());
     }
 
-    iter.close(); // avoid OOM
+    iter.close();
     // commit the current processing progress
     context.commit();
     }
@@ -197,28 +111,27 @@
     </pre>
 
     <p>
-        In the previous example, the following actions are performed:
+        In the above implementation, the following actions are performed:
     </p>
 
     <ul>
         <li>In the <code>init</code> method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".</li>
         <li>In the <code>process</code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).</li>
-        <li>In the scheduled <code>punctuate</code> method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.</li>
-        <li>When done with the <code>KeyValueIterator&lt;String, Long&gt;</code> you <em>must</em> close the iterator, as shown above or use the try-with-resources statement.</li>
+        <li>In the <code>punctuate</code> method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.</li>
     </ul>
 
 
     <h4><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h4>
 
     <p>
-        With the customized processors defined in the Processor API, you can use <code>Topology</code> to build a processor topology
+        With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology
         by connecting these processors together:
     </p>
 
     <pre class="brush: java;">
-    Topology topology = new Topology();
+    TopologyBuilder builder = new TopologyBuilder();
 
-    topology.addSource("SOURCE", "src-topic")
+    builder.addSource("SOURCE", "src-topic")
     // add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
     .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
 
@@ -241,257 +154,63 @@
     .addSink("SINK3", "sink-topic3", "PROCESS3");
     </pre>
 
-    Here is a quick walk through of the previous code to build the topology:
+    There are several steps in the above code to build the topology, and here is a quick walk through:
 
     <ul>
-        <li>A source node (<code>"SOURCE"</code>) is added to the topology using the <code>addSource</code> method, with one Kafka topic (<code>"src-topic"</code>) fed to it.</li>
-        <li>Three processor nodes are then added using the <code>addProcessor</code> method; here the first processor is a child of the source node, but is the parent of the other two processors.</li>
-        <li>Three sink nodes are added to complete the topology using the <code>addSink</code> method, each piping from a different parent processor node and writing to a separate topic.</li>
+        <li>First of all a source node named "SOURCE" is added to the topology using the <code>addSource</code> method, with one Kafka topic "src-topic" fed to it.</li>
+        <li>Three processor nodes are then added using the <code>addProcessor</code> method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.</li>
+        <li>Finally three sink nodes are added to complete the topology using the <code>addSink</code> method, each piping from a different parent processor node and writing to a separate topic.</li>
     </ul>
 
-<h4><a id="streams_processor_statestore" href="#streams_processor_statestore">State Stores</a></h4>
-
-<p>
-To make state stores fault-tolerant (e.g., to recover from machine crashes) as well as to allow for state store migration without data loss (e.g., to migrate a stateful stream task from one machine to another when elastically adding or removing capacity from your application), a state store can be <strong>continuously backed up</strong> to a Kafka topic behind the scenes. 
-We sometimes refer to this topic as the state store's associated <em>changelog topic</em> or simply its <em>changelog</em>. 
-In the case of a machine failure, for example, the state store and thus the application's state can be fully restored from its changelog. 
-You can enable or disable this backup feature for a state store, and thus its fault tolerance.
-</p>
-
-<p>
-By default, persistent <strong>key-value stores</strong> are fault-tolerant. 
-They are backed by a <a href="https://kafka.apache.org/documentation.html#compaction">compacted</a> changelog topic. 
-The purpose of compacting this topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, and to minimize recovery time if a state store needs to be restored from its changelog topic.
-</p>
-
-<p>
-Similarly, persistent <strong>window stores</strong> are fault-tolerant. 
-They are backed by a topic that uses both <em>compaction</em> and <em>deletion</em>. 
-Using deletion in addition to compaction is required for the changelog topics of window stores because of the structure of the message keys that are being sent to the changelog topics: for window stores, the message keys are composite keys that include not only the &quot;normal&quot; key but also window timestamps. 
-For such composite keys it would not be sufficient to enable just compaction in order to prevent a changelog topic from growing out of bounds. 
-With deletion enabled, old windows that have expired will be cleaned up by Kafka's log cleaner as the log segments expire. 
-The default retention setting is <code>Windows#maintainMs()</code> + 1 day. This setting can be overriden by specifying <code>StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</code> in the <code>StreamsConfig</code>.
-</p>
-
-<p>
-One additional note regarding the use of state stores.  Any time you open an <code>Iterator</code> from a state store you <em>must</em> call <code>close()</code> on the iterator
-when you are done working with it to reclaim resources.  Or you can use the iterator from within a try-with-resources statement.
-    By not closing an iterator, you may likely encounter an OOM error.
-</p>
-
-
-<h4><a id="restoration_progress" href="#restoration_progress">Monitoring the Restoration Progress of Fault-tolerant State Stores</a></h4>
-
-<p>
-When starting up your application any fault-tolerant state stores don't need a restoration process as the persisted state is read from local disk. 
-But there could be situations when a full restore from the backing changelog topic is required (e.g., a failure wiped out the local state or your application runs in a stateless environment and persisted data is lost on re-starts).
-</p>
-
-<p>
-If you have a significant amount of data in the changelog topic, the restoration process could take a non-negligible amount of time. 
-Given that processing of new data won't start until the restoration process is completed, having a window into the progress of restoration is useful.
-</p>
-
-<p>
-In order to observe the restoration of all state stores you provide your application an instance of the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface. 
-You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> by calling the <code>KafkaStreams#setGlobalStateRestoreListener</code> method.
-</p>
-
-<p>
- A basic implementation example that prints restoration status to the console:
-</p>
-
-<pre class="brush: java;">
-  import org.apache.kafka.common.TopicPartition;
-  import org.apache.kafka.streams.processor.StateRestoreListener;
-
-   public class ConsoleGlobalRestoreListerner implements StateRestoreListener {
-
-      @Override
-      public void onRestoreStart(final TopicPartition topicPartition,
-                                 final String storeName,
-                                 final long startingOffset,
-                                 final long endingOffset) {
-
-          System.out.print("Started restoration of " + storeName + " partition " + topicPartition.partition());
-          System.out.println(" total records to be restored " + (endingOffset - startingOffset));
-      }
-
-      @Override
-      public void onBatchRestored(final TopicPartition topicPartition,
-                                  final String storeName,
-                                  final long batchEndOffset,
-                                  final long numRestored) {
+    <h4><a id="streams_processor_statestore" href="#streams_processor_statestore">State Stores</a></h4>
 
-          System.out.println("Restored batch " + numRestored + " for " + storeName + " partition " + topicPartition.partition());
-
-      }
-
-      @Override
-      public void onRestoreEnd(final TopicPartition topicPartition,
-                               final String storeName,
-                               final long totalRestored) {
-
-          System.out.println("Restoration complete for " + storeName + " partition " + topicPartition.partition());
-      }
-  }
-</pre>
-
-<blockquote>
-<p>
-  The <code>StateRestoreListener</code> instance is shared across all <code>org.apache.kafka.streams.processor.internals.StreamThread</code> instances and it is assumed all methods are stateless. 
-  If any stateful operations are desired, then the user will need to provide synchronization internally.
-</p>
-</blockquote>
-
-<h4> <a id="disable-changelogs" href="#disable-changelogs">Enable / Disable Fault Tolerance of State Stores (Store Changelogs)</a></h4>
-
-<p>
-    You can enable or disable fault tolerance for a state store by enabling or disabling, respectively ,the changelogging of the store through <code>StateStoreBuilder#withLoggingEnabled(Map&lt;String, String&gt;)</code>
-    and <code>StateStoreBuilder#withLoggingDisabled()</code>.
-    You can also fine-tune the associated topic’s configuration if needed.
-</p>
-
-<p>Example for disabling fault-tolerance:</p>
-
-<pre class="brush: java;">
-
-  import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
-  import org.apache.kafka.streams.processor.state.StateStoreBuilder;
-  import org.apache.kafka.streams.state.Stores;
-
-  KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
-  StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
-                                                          Serdes.String(),
-                                                          Serdes.Long())
-                                    .withLoggingDisabled(); // disable backing up the store to a changelog topic
-
-</pre>
-
-<blockquote>
-<p>If the changelog is disabled then the attached state store is no longer fault tolerant and it can't have any standby replicas</p>
-</blockquote>
-
-<p>
-   Example for enabling fault tolerance, with additional changelog-topic configuration: You can add any log config 
-   from kafka.log.LogConfig|core/src/main/scala/kafka/log/LogConfig.scala#L61. Unrecognized configs will be ignored.
-</p>
-
-<pre class="brush: java;">
-
-  import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
-  import org.apache.kafka.streams.processor.state.StateStoreBuilder;
-  import org.apache.kafka.streams.state.Stores;
-
-  Map&lt;String, String&gt; changelogConfig = new HashMap();
-  // override min.insync.replicas
-  changelogConfig.put("min.insyc.replicas", "1")
-
-  KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
-  StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
-                                                          Serdes.String(),
-                                                          Serdes.Long())
-                                    .withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings
-
-
-</pre>
-
-<h4><a id="implementing-custom-state-stores" href="#implementing-custom-state-stores">Implementing custom State Stores</a></h4>
-
-<p>
- Apart from using the built-in state store types, you can also implement your own. 
- The primary interface to implement for the store is <code>org.apache.kafka.streams.processor.StateStore</code>. 
- Beyond that, Kafka Streams also has a few extended interfaces such as <code>KeyValueStore</code>.
-</p>
-
-<p>
-  In addition to the actual store, you also need to provide a &quot;factory&quot; for the store by implementing the <code>org.apache.kafka.streams.processor.state.StoreSupplier</code> interface, which Kafka Streams uses to create instances of your store.
-</p>
-
-<p>
-  You also have the option of providing a <code>org.apache.kafka.streams.processor.StateRestoreCallback</code> instance used to restore the state store from its backing changelog topic. 
-  This is done via the <code>org.apache.kafka.streams.processor.ProcessorContext#register</code> call inside the <code>StateStore#init</code> all.
-</p>
-
-<pre class="brush: java;">
-  public void init(ProcessorContext context, StateStore store) {
-     context.register(store, false, stateRestoreCallBackIntance);
-   }    
-</pre>
-
-<p>
-  There is an additional interface <code>org.apache.kafka.streams.processor.BatchingStateRestoreCallback</code> that provides bulk restoration semantics vs. the single record-at-a-time restoration semantics offered by the <code>StateRestoreCallback</code> interface.
-</p>
-
-<p>
-  Addtionally there are two abstract classes that implement <code>StateRestoreCallback</code> or <code>BatchingStateRestoreCallback</code> in conjuntion with the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface (<code>org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback</code> and <code>org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback</code> respectively) that provide the ability for the state store to recieve notifi [...]
-  The <code>StateRestoreListener</code> in this case is per state store instance and is used for internal purposes such as updating config settings based on the status of the restoration process.
-</p>
-
-<h4><a id="connecting-processors-and-state-stores" href="#connecting-processors-and-state-stores">Connecting Processors and State Stores</a></h4>
-
-<p>
-Now that we have defined a processor (WordCountProcessor) and the state stores, we can now construct the processor topology by connecting these processors and state stores together by using the <code>Topology</code> instance. 
-In addition, users can add <em>source processors</em> with the specified Kafka topics to generate input data streams into the topology, and <em>sink processors</em> with the specified Kafka topics to generate output data streams out of the topology.
-</p>
-
-<pre class="brush: java;">
-       Topology topology = new Topology();
-
-      // add the source processor node that takes Kafka topic "source-topic" as input
-      topology.addSource("Source", "source-topic")
-
-      // add the WordCountProcessor node which takes the source processor as its upstream processor
-      .addProcessor("Process", () -> new WordCountProcessor(), "Source")
+    <p>
+        Note that the <code>Processor</code> API is not limited to only accessing the current records as they arrive in the <code>process()</code> method, but can also maintain processing states
+        that keep recently arrived records to use in stateful processing operations such as windowed joins or aggregation.
+        To take advantage of these states, users can define a state store by implementing the <code>StateStore</code> interface (the Kafka Streams library also has a few extended interfaces such as <code>KeyValueStore</code>);
+        in practice, though, users usually do not need to customize such a state store from scratch but can simply use the <code>Stores</code> factory to define a state store by specifying whether it should be persistent, log-backed, etc.
+        In the following example, a persistent key-value store named "Counts" with key type <code>String</code> and value type <code>Long</code> is created.
+    </p>
 
-      // add the count store associated with the WordCountProcessor processor
-      .addStateStore(countStoreSupplier, "Process")
+    <pre class="brush: java;">
+    StateStoreSupplier countStore = Stores.create("Counts")
+    .withKeys(Serdes.String())
+    .withValues(Serdes.Long())
+    .persistent()
+    .build();
+    </pre>
 
-      // add the sink processor node that takes Kafka topic "sink-topic" as output
-      // and the WordCountProcessor node as its upstream processor
-      .addSink("Sink", "sink-topic", "Process");
-</pre>
+    <p>
+        To take advantage of these state stores, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the
+        processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
+        state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
+    </p>
 
-<p>There are several steps in the above implementation to build the topology, and here is a quick walk-through:</p>
-<ul>
-   <li>A source processor node named &quot;Source&quot; is added to the topology using the <code>addSource</code> method, with one Kafka topic &quot;source-topic&quot; fed to it.</li>
-   <li>A processor node named &quot;Process&quot; with the pre-defined <code>WordCountProcessor</code> logic is then added as the downstream processor of the &quot;Source&quot; node using the <code>addProcessor</code> method.</li>
-   <li>A predefined persistent key-value state store is created and associated with the &quot;Process&quot; node, using <code>countStoreSupplier</code>.</li>
-   <li>A sink processor node is then added to complete the topology using the <code>addSink</code> method, taking the &quot;Process&quot; node as its upstream processor and writing to a separate &quot;sink-topic&quot; Kafka topic.</li>
-</ul>
+    <pre class="brush: java;">
+    TopologyBuilder builder = new TopologyBuilder();
 
-<p>
-In this topology, the &quot;Process&quot; stream processor node is considered a downstream processor of the &quot;Source&quot; node, and an upstream processor of the &quot;Sink&quot; node. 
-As a result, whenever the &quot;Source&quot; node forward a newly fetched record from Kafka to its downstream &quot;Process&quot; node, <code>WordCountProcessor#process()</code> method is triggered to process the record and update the associated state store; and whenever <code>context#forward()</code> is called in the <code>Punctuator#punctuate()</code> method, the aggregate key-value pair will be sent via the &quot;Sink&quot; processor node to the Kafka topic &quot;sink-topic&quot;.
-Note that in the <code>WordCountProcessor</code> implementation, users need to refer to the same store name &quot;Counts&quot; when accessing the key-value store; otherwise an exception will be thrown at runtime, indicating that the state store cannot be found; also, if the state store itself is not associated with the processor in the <code>Topology</code> code, accessing it in the processor's <code>init()</code> method will also throw an exception at runtime, indicating the state store [...]
-</p>
+    builder.addSource("SOURCE", "src-topic")
 
+    .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
+    // add the created state store "COUNTS" associated with processor "PROCESS1"
+    .addStateStore(countStore, "PROCESS1")
+    .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
+    .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
 
-    <h4><a id="streams_processor_describe" href="#streams_processor_describe">Describe a <code>Topology</code></a></h4>
+    // connect the state store "COUNTS" with processor "PROCESS2"
+    .connectProcessorAndStateStores("PROCESS2", "COUNTS");
 
-    <p>
-        After a <code>Topology</code> is specified, it is possible to retrieve a description of the corresponding DAG via <code>#describe()</code> that returns a <code>TopologyDescription</code>.
-        A <code>TopologyDescription</code> contains all added source, processor, and sink nodes as well as all attached stores.
-        You can access the specified input and output topic names and patterns for source and sink nodes.
-        For processor nodes, the attached stores are added to the description.
-        Additionally, all nodes have a list to all their connected successor and predecessor nodes.
-        Thus, <code>TopologyDescritpion</code> allows to retrieve the DAG structure of the specified topology.
-        <br />
-        Note that global stores are listed explicitly because they are accessible by all nodes without the need to explicitly connect them.
-        Furthermore, nodes are grouped by <code>Sub-topologies</code>, where each sub-topology is a group of processor nodes that are directly connected to each other (i.e., either by a direct connection&mdash;but not a topic&mdash;or by sharing a store).
-        During execution, each <code>Sub-topology</code> will be processed by <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one or multiple tasks</a>.
-        Thus, each <code>Sub-topology</code> describes an independent unit of works that can be executed by different threads in parallel.
-        <br />
-        Describing a <code>Topology</code> before starting your streams application with the specified topology is helpful to reason about tasks and thus maximum parallelism (we will talk about how to execute your written application later in this section).
-        It is also helpful to get insight into a <code>Topology</code> if it is not specified directly as described above but via Kafka Streams DSL (we will describe the DSL in the next section.
-    </p>
+    .addSink("SINK1", "sink-topic1", "PROCESS1")
+    .addSink("SINK2", "sink-topic2", "PROCESS2")
+    .addSink("SINK3", "sink-topic3", "PROCESS3");
+    </pre>
 
     In the next section we present another way to build the processor topology: the Kafka Streams DSL.
     <br>
 
     <h3><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h3>
 
-    To build a <code>Topology</code> using the Streams DSL, developers can apply the <code>StreamsBuilder</code> class.
+    To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code> class, which is extended from the <code>TopologyBuilder</code>.
     A simple example is included with the source code for Kafka in the <code>streams/examples</code> package. The rest of this section will walk
     through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source
     codes for details.
@@ -545,1438 +264,25 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
     ("alice", 1) --> ("alice", 3)
     </pre>
 
-    If the stream is defined as a KStream and the stream processing application were to sum the values it would return <code>4</code>. If the stream is defined as a KTable or GlobalKTable, the return would be <code>3</code>, since the last record would be considered as an update.
+    If these records a KStream and the stream processing application were to sum the values it would return <code>4</code>. If these records were a KTable or GlobalKTable, the return would be <code>3</code>, since the last record would be considered as an update.
 
-    <h4><a id="streams_dsl_source" href="#streams_dsl_source">Creating Source Streams from Kafka</a></h4>
-
-    <p>
-    You can easily read data from Kafka topics into your application. We support the following operations.
-    </p>
-    <table class="data-table" border="1">
-        <tbody><tr>
-            <th>Reading from Kafka</th>
-            <th>Description</th>
-        </tr>
-        <tr>
-            <td><b>Stream</b>: input topic(s) &rarr; <code>KStream</code></td>
-            <td>Create a <code>KStream</code> from the specified Kafka input topic(s), interpreting the data as a record stream.
-                A <code>KStream</code> represents a partitioned record stream.
-                <p>
-                    Slightly simplified, in the case of a KStream, the local KStream instance of every application instance will be populated
-                    with data from only a <b>subset</b> of the partitions of the input topic. Collectively, i.e. across all application instances,
-                    all the partitions of the input topic will be read and processed.
-                </p>
-                <pre class="brush: java;">
-                    import org.apache.kafka.common.serialization.Serdes;
-                    import org.apache.kafka.streams.StreamsBuilder;
-                    import org.apache.kafka.streams.kstream.KStream;
-
-                    StreamsBuilder builder = new StreamsBuilder();
-
-                    KStream&lt;String, Long&gt; wordCounts = builder.stream(
-                        "word-counts-input-topic" /* input topic */,
-                        Consumed.with(Serdes.String(), Serdes.Long()); // define key and value serdes
-                </pre>
-                When to provide serdes explicitly:
-                <ul>
-                    <li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
-                    <li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic(s) do not match
-                        the configured default serdes. </li>
-                </ul>
-                Several variants of <code>stream</code> exist to e.g. specify a regex pattern for input topics to read from.</td>
-        </tr>
-        <tr>
-            <td><b>Table</b>: input topic(s) &rarr; <code>KTable</code></td>
-            <td>
-                Reads the specified Kafka input topic into a <code>KTable</code>. The topic is interpreted as a changelog stream,
-                where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not <code>null</code>) or
-                as DELETE (when the value is null) for that key.
-                <p>
-                    Slightly simplified, in the case of a KTable, the local KTable instance of every application instance will be populated
-                    with data from only a subset of the partitions of the input topic. Collectively, i.e. across all application instances, all
-                    the partitions of the input topic will be read and processed.
-                </p>
-                <p>
-                You may provide an optional name for the table (more precisely, for the internal state store that backs the table).
-                When a name is provided, the table can be queryied using <a href="#streams_interactive_queries">interactive queries</a>.
-                When a name is not provided the table will not queryable and an internal name will be provided for the state store.
-                </p>
-                <pre class="brush: java;">
-                    import org.apache.kafka.common.serialization.Serdes;
-                    import org.apache.kafka.streams.StreamsBuilder;
-                    import org.apache.kafka.streams.kstream.KTable;
-
-                    StreamsBuilder builder = new StreamsBuilder();
-
-                    KTable&lt;String, Long&gt; wordCounts = builder.table(
-                        Serdes.String(), /* key serde */
-                        Serdes.Long(),   /* value serde */
-                        "word-counts-input-topic", /* input topic */
-                        "word-counts-partitioned-store" /* table/store name */);
-                </pre>
-
-                When to provide serdes explicitly:
-                <ul>
-                    <li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
-                    <li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
-                        match the configured default serdes.</li>
-                </ul>
-
-                Several variants of <code>table</code> exist to e.g. specify the <code>auto.offset.reset</code>
-                policy to be used when reading from the input topic.
-            </td>
-        <tr>
-            <td><b>Global Table</b>: input topic &rarr; <code>GlobalKTable</code></td>
-            <td>
-                Reads the specified Kafka input topic into a <code>GlobalKTable</code>. The topic is interpreted as a changelog stream, where records
-                with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not <code>null</code>) or as DELETE (when the
-                value is <code>null</code>) for that key.
-                <p>
-                    Slightly simplified, in the case of a GlobalKTable, the local GlobalKTable instance of every application instance will be
-                    populated with data from all the partitions of the input topic. In other words, when using a global table, every application
-                    instance will get its own, full copy of the topic's data.
-                </p>
-                <p>
-                You may provide an optional name for the table (more precisely, for the internal state store that backs the table).
-                When a name is provided, the table can be queryied using <a href="#streams_interactive_queries">interactive queries</a>.
-                When a name is not provided the table will not queryable and an internal name will be provided for the state store.
-                </p>
-                <pre class="brush: java;">
-                    import org.apache.kafka.common.serialization.Serdes;
-                    import org.apache.kafka.streams.StreamsBuilder;
-                    import org.apache.kafka.streams.kstream.GlobalKTable;
-
-                    StreamsBuilder builder = new StreamsBuilder();
-
-                    GlobalKTable&lt;String, Long&gt; wordCounts = builder.globalTable(
-                        Serdes.String(), /* key serde */
-                        Serdes.Long(),   /* value serde */
-                        "word-counts-input-topic", /* input topic */
-                        "word-counts-global-store" /* table/store name */);
-                </pre>
-
-                When to provide serdes explicitly:
-                <ul>
-                    <li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
-                    <li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
-                        match the configured default serdes.</li>
-                </ul>
-                Several variants of <code>globalTable</code> exist to e.g. specify explicit serdes.
-
-            </td>
-        </tbody>
-    </table>
-
-    <h4><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform a stream</a></h4>
-    <p>
-    <code>KStream</code> and <code>KTable</code> support a variety of transformation operations. Each of these operations
-    can be translated into one or more connected processors into the underlying processor topology. Since <code>KStream</code>
-    and <code>KTable</code> are strongly typed, all these transformation operations are defined as generic functions where
-    users could specify the input and output data types.
-    </p>
-    <p>
-    Some <code>KStream</code> transformations may generate one or more <code>KStream</code> objects (e.g., filter and
-    map on <code>KStream</code> generate another <code>KStream</code>, while branch on <code>KStream</code> can generate
-    multiple <code>KStream</code> instances) while some others may generate a <code>KTable</code> object (e.g., aggregation) interpreted
-    as the changelog stream to the resulted relation. This allows Kafka Streams to continuously update the computed value upon arrival
-    of late records after it has already been produced to the downstream transformation operators. As for <code>KTable</code>,
-    all its transformation operations can only generate another <code>KTable</code> (though the Kafka Streams DSL does
-    provide a special function to convert a <code>KTable</code> representation into a <code>KStream</code>, which we will
-    describe later). Nevertheless, all these transformation methods can be chained together to compose a complex processor topology.
-    </p>
-    <p>
-    We describe these transformation operations in the following subsections, categorizing them into two categories:
-    stateless and stateful transformations.
-    </p>
-    <h5><a id="streams_dsl_transformations_stateless" href="#streams_dsl_transformations_stateless">Stateless transformations</a></h5>
-    <p>
-    Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise they do not
-    require a state store associated with the stream processor.
-    </p>
-    <table class="data-table" border="1">
-        <tbody><tr>
-            <th>Transformation</th>
-            <th>Description</th>
-        </tr>
-        <tr>
-            <td><b>Branch</b>: <code>KStream &rarr; KStream</code></td>
-            <td>
-                <p>
-                Branch (or split) a <code>KStream</code> based on the supplied predicates into one or more <code>KStream</code> instances.
-                </p>
-                <p>
-                Predicates are evaluated in order. A record is placed to one and only one output stream on the first match:
-                if the n-th predicate evaluates to true, the record is placed to n-th stream. If no predicate matches,
-                the record is dropped.
-                </p>
-                <p>
-                Branching is useful, for example, to route records to different downstream topics.
-                </p>
-                <pre class="brush: java;">
-                    KStream&lt;String, Long&gt; stream = ...;
-                    KStream&lt;String, Long&gt;[] branches = stream.branch(
-                            (key, value) -> key.startsWith("A"), /* first predicate  */
-                            (key, value) -> key.startsWith("B"), /* second predicate */
-                            (key, value) -> true                 /* third predicate  */
-                    );
-                    // KStream branches[0] contains all records whose keys start with "A"
-                    // KStream branches[1] contains all records whose keys start with "B"
-                    // KStream branches[2] contains all other records
-                    // Java 7 example: cf. `filter` for how to create `Predicate` instances
-            </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Filter</b>: <code>KStream &rarr; KStream or KTable &rarr; KTable</code></td>
-            <td>
-                <p>
-                Evaluates a boolean function for each element and retains those for which the function returns true.
-                </p>
-                <pre class="brush: java;">
-                     KStream&lt;String, Long&gt; stream = ...;
-                     KTable&lt;String, Long&gt; table = ...;
-                     // A filter that selects (keeps) only positive numbers
-                     // Java 8+ example, using lambda expressions
-                     KStream&lt;String, Long&gt; onlyPositives = stream.filter((key, value) -> value > 0);
-
-                     // Java 7 example
-                     KStream&lt;String, Long&gt; onlyPositives = stream.filter(
-                       new Predicate&lt;String, Long&gt;() {
-                         @Override
-                         public boolean test(String key, Long value) {
-                           return value > 0;
-                         }
-                       });
-
-                    // A filter on a KTable that materializes the result into a StateStore
-                    table.filter((key, value) -> value != 0, Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("filtered"));
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Inverse Filter</b>: <code>KStream &rarr; KStream or KTable &rarr; KTable</code></td>
-            <td>
-                <p>
-                Evaluates a boolean function for each element and drops those for which the function returns true.
-                </p>
-                <pre class="brush: java;">
-                     KStream&lt;String, Long&gt; stream = ...;
-
-                     // An inverse filter that discards any negative numbers or zero
-                     // Java 8+ example, using lambda expressions
-                     KStream&lt;String, Long&gt; onlyPositives = stream.filterNot((key, value) -> value <= 0);
-
-                     // Java 7 example
-                     KStream&lt;String, Long&gt; onlyPositives = stream.filterNot(
-                      new Predicate&lt;String, Long&gt;() {
-                        @Override
-                        public boolean test(String key, Long value) {
-                            return value <= 0;
-                        }
-                     });
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>FlatMap</b>: <code>KStream &rarr; KStream </code></td>
-            <td>
-                <p>
-                Takes one record and produces zero, one, or more records. You can modify the record keys and values,
-                including their types.
-                </p>
-
-                <p>
-                Marks the stream for data re-partitioning: Applying a grouping or a join after <code>flatMap</code> will result in
-                re-partitioning of the records. If possible use <code>flatMapValues</code> instead, which will not cause data re-partitioning.
-                </p>
-                <pre class="brush: java;">
-                     KStream&lt;Long, String> stream = ...;
-                     KStream&lt;String, Integer&gt; transformed = stream.flatMap(
-                         // Here, we generate two output records for each input record.
-                         // We also change the key and value types.
-                         // Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
-                         (key, value) -> {
-                             List&lt;KeyValue&lt;String, Integer&gt;&gt; result = new LinkedList&lt;&gt;();
-                             result.add(KeyValue.pair(value.toUpperCase(), 1000));
-                             result.add(KeyValue.pair(value.toLowerCase(), 9000));
-                             return result;
-                         }
-                     );
-                     // Java 7 example: cf. `map` for how to create `KeyValueMapper` instances
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>FlatMap (values only)</b>: <code>KStream &rarr; KStream </code></td>
-            <td>
-                <p>
-                Takes one record and produces zero, one, or more records, while retaining the key of the original record.
-                You can modify the record values and the value type.
-                </p>
-                <p>
-                <code>flatMapValues</code> is preferable to <code>flatMap</code> because it will not cause data re-partitioning. However,
-                it does not allow you to modify the key or key type like <code>flatMap</code> does.
-                </p>
-                <pre class="brush: java;">
-                   // Split a sentence into words.
-                   KStream&lt;byte[], String&gt; sentences = ...;
-                   KStream&lt;byte[], String&gt; words = sentences.flatMapValues(value -> Arrays.asList(value.split("\\s+")));
-
-                   // Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances
-               </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Foreach</b>: <code>KStream &rarr; void </code></td>
-            <td>
-                <p>
-                Terminal operation. Performs a stateless action on each record.
-                </p>
-                <p>
-                Note on processing guarantees: Any side effects of an action (such as writing to external systems)
-                are not trackable by Kafka, which means they will typically not benefit from Kafka's processing guarantees.
-                </p>
-                <pre class="brush: java;">
-                       KStream&lt;String, Long&gt; stream = ...;
-
-                       // Print the contents of the KStream to the local console.
-                       // Java 8+ example, using lambda expressions
-                       stream.foreach((key, value) -> System.out.println(key + " => " + value));
-
-                       // Java 7 example
-                       stream.foreach(
-                           new ForeachAction&lt;String, Long&gt;() {
-                               @Override
-                               public void apply(String key, Long value) {
-                                 System.out.println(key + " => " + value);
-                               }
-                       });
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>GroupByKey</b>: <code>KStream &rarr; KGroupedStream </code></td>
-            <td>
-                <p>
-                Groups the records by the existing key.
-                </p>
-                <p>
-                Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly
-                partitioned ("keyed") for subsequent operations.
-                </p>
-                <p>
-                <b>When to set explicit serdes</b>: Variants of <code>groupByKey</code> exist to override the configured default serdes of
-                your application, which you must do if the key and/or value types of the resulting <code>KGroupedStream</code> do
-                not match the configured default serdes.
-                </p>
-                <p>
-                <b>Note:</b>
-                Grouping vs. Windowing: A related operation is windowing, which lets you control how to "sub-group" the
-                grouped records of the same key into so-called windows for stateful operations such as windowed aggregations
-                or windowed joins.
-                </p>
-                <p>
-                Causes data re-partitioning if and only if the stream was marked for re-partitioning. <code>groupByKey</code> is
-                preferable to <code>groupBy</code> because it re-partitions data only if the stream was already marked for re-partitioning.
-                However, <code>groupByKey</code> does not allow you to modify the key or key type like <code>groupBy</code> does.
-                </p>
-                <pre class="brush: java;">
-                       KStream&lt;byte[], String&gt; stream = ...;
-
-                       // Group by the existing key, using the application's configured
-                       // default serdes for keys and values.
-                       KGroupedStream&lt;byte[], String&gt; groupedStream = stream.groupByKey();
-
-                       // When the key and/or value types do not match the configured
-                       // default serdes, we must explicitly specify serdes.
-                       KGroupedStream&lt;byte[], String&gt; groupedStream = stream.groupByKey(
-                           Serialized.with(
-                                Serdes.ByteArray(), /* key */
-                                Serdes.String())     /* value */
-                       );
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>GroupBy</b>: <code>KStream &rarr; KGroupedStream or KTable &rarr; KGroupedTable</code></td>
-            <td>
-                <p>
-                Groups the records by a new key, which may be of a different key type. When grouping a table,
-                you may also specify a new value and value type. groupBy is a shorthand for selectKey(...).groupByKey().
-                </p>
-                <p>
-                Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly
-                partitioned ("keyed") for subsequent operations.
-                </p>
-                <p>
-                <b>When to set explicit serdes</b>: Variants of groupBy exist to override the configured default serdes of your
-                application, which you must do if the key and/or value types of the resulting KGroupedStream or
-                KGroupedTable do not match the configured default serdes.
-                </p>
-                <p>
-                <b>Note:</b>
-                Grouping vs. Windowing: A related operation is windowing, which lets you control how to “sub-group” the
-                grouped records of the same key into so-called windows for stateful operations such as windowed aggregations
-                or windowed joins.
-                </p>
-                <p>
-                <b>Always causes data re-partitioning:</b> groupBy always causes data re-partitioning. If possible use groupByKey
-                instead, which will re-partition data only if required.
-                </p>
-                <pre class="brush: java;">
-                       KStream&lt;byte[], String&gt; stream = ...;
-                       KTable&lt;byte[], String&gt; table = ...;
-
-                       // Java 8+ examples, using lambda expressions
-
-                       // Group the stream by a new key and key type
-                       KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
-                           (key, value) -> value,
-                           Serialize.with(
-                                Serdes.String(), /* key (note: type was modified) */
-                                Serdes.String())  /* value */
-                       );
-
-                       // Group the table by a new key and key type, and also modify the value and value type.
-                       KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
-                           (key, value) -> KeyValue.pair(value, value.length()),
-                           Serialized.with(
-                               Serdes.String(), /* key (note: type was modified) */
-                               Serdes.Integer()) /* value (note: type was modified) */
-                       );
-
-
-                       // Java 7 examples
-
-                       // Group the stream by a new key and key type
-                       KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
-                           new KeyValueMapper&lt;byte[], String, String&gt;&gt;() {
-                               @Override
-                               public String apply(byte[] key, String value) {
-                                  return value;
-                               }
-                           },
-                           Serialized.with(
-                                Serdes.String(), /* key (note: type was modified) */
-                                Serdes.String())  /* value */
-                       );
-
-                       // Group the table by a new key and key type, and also modify the value and value type.
-                       KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
-                            new KeyValueMapper&lt;byte[], String, KeyValue&lt;String, Integer&gt;&gt;() {
-                            @Override
-                                public KeyValue&lt;String, Integer&gt; apply(byte[] key, String value) {
-                                   return KeyValue.pair(value, value.length());
-                                }
-                            },
-                            Serialized.with(
-                                Serdes.String(), /* key (note: type was modified) */
-                                Serdes.Integer()) /* value (note: type was modified) */
-                       );
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Map</b>: <code>KStream &rarr; KStream</code></td>
-            <td>
-                <p>
-                Takes one record and produces one record. You can modify the record key and value, including their types.
-                </p>
-
-                <p>
-                <b>Marks the stream for data re-partitioning:</b> Applying a grouping or a join after <code>flatMap</code> will result in
-                re-partitioning of the records. If possible use <code>mapValues</code> instead, which will not cause data re-partitioning.
-                </p>
-
-                <pre class="brush: java;">
-                       KStream&lt;byte[], String&gt; stream = ...;
-
-                       // Java 8+ example, using lambda expressions
-                       // Note how we change the key and the key type (similar to `selectKey`)
-                       // as well as the value and the value type.
-                       KStream&lt;String, Integer&gt; transformed = stream.map(
-                           (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
-
-                       // Java 7 example
-                       KStream&lt;String, Integer&gt; transformed = stream.map(
-                           new KeyValueMapper&lt;byte[], String, KeyValue&lt;String, Integer&gt;&gt;() {
-                           @Override
-                           public KeyValue&lt;String, Integer&gt; apply(byte[] key, String value) {
-                               return new KeyValue&lt;&gt;(value.toLowerCase(), value.length());
-                           }
-                       });
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Map (values only)</b>: <code>KStream &rarr; KStream or KTable &rarr; KTable</code></td>
-            <td>
-                <p>
-                Takes one record and produces one record, while retaining the key of the original record. You can modify
-                the record value and the value type.
-                </p>
-                <p>
-                <code>mapValues</code> is preferable to <code>map</code> because it will not cause data re-partitioning. However, it does not
-                allow you to modify the key or key type like <code>map</code> does.
-                </p>
-
-                <pre class="brush: java;">
-                       KStream&lt;byte[], String&gt; stream = ...;
-                       KTable&lt;String, String&gt; table = ...;
-
-                       // Java 8+ example, using lambda expressions
-                       KStream&lt;byte[], String&gt; uppercased = stream.mapValues(value -> value.toUpperCase());
-
-                       // Java 7 example
-                       KStream&lt;byte[], String&gt; uppercased = stream.mapValues(
-                          new ValueMapper&lt;String&gt;() {
-                          @Override
-                          public String apply(String s) {
-                             return s.toUpperCase();
-                          }
-                       });
-
-                       // mapValues on a KTable and also materialize the results into a statestore
-                       table.mapValue(value -> value.toUpperCase(), Materialized.&lt;String, String, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("uppercased"));
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Print</b>: <code>KStream &rarr; void </code></td>
-            <td>
-                <p>
-                Terminal operation. Prints the records to <code>System.out</code>. See Javadocs for serde and <code>toString()</code> caveats.
-                </p>
-                <pre class="brush: java;">
-                       KStream&lt;byte[], String&gt; stream = ...;
-                       stream.print();
-                    
-                       // You can also override how and where the data is printed, i.e, to file:
-                       stream.print(Printed.toFile("stream.out"));
-
-                       // with a custom KeyValueMapper and label
-                       stream.print(Printed.toSysOut()
-                                .withLabel("my-stream")
-                                .withKeyValueMapper((key, value) -> key + " -> " + value));
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>SelectKey</b>: <code>KStream &rarr; KStream</code></td>
-            <td>
-                <p>
-                Assigns a new key, possibly of a new key type, to each record.
-                </p>
-                <p>
-                Marks the stream for data re-partitioning: Applying a grouping or a join after <code>flatMap</code> will result in
-                re-partitioning of the records.
-                </p>
-
-                <pre class="brush: java;">
-                       KStream&lt;byte[], String&gt; stream = ...;
-
-                       // Derive a new record key from the record's value.  Note how the key type changes, too.
-                       // Java 8+ example, using lambda expressions
-                       KStream&lt;String, String&gt; rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])
-
-                       // Java 7 example
-                       KStream&lt;String, String&gt; rekeyed = stream.selectKey(
-                           new KeyValueMapper&lt;byte[], String, String&gt;() {
-                           @Override
-                           public String apply(byte[] key, String value) {
-                              return value.split(" ")[0];
-                           }
-                         });
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Table to Stream</b>: <code>KTable &rarr; KStream</code></td>
-            <td>
-                <p>
-                Converts this table into a stream.
-                </p>
-                <pre class="brush: java;">
-                       KTable&lt;byte[], String> table = ...;
-
-                       // Also, a variant of `toStream` exists that allows you
-                       // to select a new key for the resulting stream.
-                       KStream&lt;byte[], String> stream = table.toStream();
-                </pre>
-            </td>
-        </tr>
-        <tr>
-            <td><b>WriteAsText</b>: <code>KStream &rarr; void </code></td>
-            <td>
-                <p>
-                Terminal operation. Write the records to a file. See Javadocs for serde and <code>toString()</code> caveats.
-                </p>
-                <pre class="brush: java;">
-                       KStream&lt;byte[], String&gt; stream = ...;
-                       stream.writeAsText("/path/to/local/output.txt");
-
-                       // Several variants of `writeAsText` exist to e.g. override the
-                       // default serdes for record keys and record values.
-                       stream.writeAsText("/path/to/local/output.txt", Serdes.ByteArray(), Serdes.String());
-                </pre>
-            </td>
-        </tr>
-        </tbody>
-    </table>
-
-
-    <h5><a id="streams_dsl_transformations_stateful" href="#streams_dsl_transformations_stateful">Stateful transformations</a></h5>
-    <h6><a id="streams_dsl_transformations_stateful_overview" href="#streams_dsl_transformations_stateful_overview">Overview</a></h6>
-    <p>
-        Stateful transformations, by definition, depend on state for processing inputs and producing outputs, and
-        hence implementation-wise they require a state store associated with the stream processor. For example,
-        in aggregating operations, a windowing state store is used to store the latest aggregation results per window;
-        in join operations, a windowing state store is used to store all the records received so far within the
-        defined window boundary.
-    </p>
-    <p>
-        Note, that state stores are fault-tolerant. In case of failure, Kafka Streams guarantees to fully restore
-        all state stores prior to resuming the processing.
-    </p>
-    <p>
-        Available stateful transformations in the DSL include:
-    <ul>
-        <li><a href=#streams_dsl_aggregations>Aggregating</a></li>
-        <li><a href="#streams_dsl_joins">Joining</a></li>
-        <li><a href="#streams_dsl_windowing">Windowing (as part of aggregations and joins)</a></li>
-        <li>Applying custom processors and transformers, which may be stateful, for Processor API integration</li>
-    </ul>
-    </p>
-    <p>
-        The following diagram shows their relationships:
-    </p>
-    <figure>
-        <img class="centered" src="/{{version}}/images/streams-stateful_operations.png" style="width:500pt;">
-        <figcaption style="text-align: center;"><i>Stateful transformations in the DSL</i></figcaption>
-    </figure>
-
-    <p>
-        We will discuss the various stateful transformations in detail in the subsequent sections. However, let's start
-        with a first example of a stateful application: the canonical WordCount algorithm.
-    </p>
-    <p>
-        WordCount example in Java 8+, using lambda expressions:
-    </p>
-    <pre class="brush: java;">
-        // We assume record values represent lines of text.  For the sake of this example, we ignore
-        // whatever may be stored in the record keys.
-        KStream&lt;String, String&gt; textLines = ...;
-
-        KStream&lt;String, Long&gt; wordCounts = textLines
-            // Split each text line, by whitespace, into words.  The text lines are the record
-            // values, i.e. we can ignore whatever data is in the record keys and thus invoke
-            // `flatMapValues` instead of the more generic `flatMap`.
-            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
-            // Group the stream by word to ensure the key of the record is the word.
-            .groupBy((key, word) -> word)
-            // Count the occurrences of each word (record key).
-            //
-            // This will change the stream type from `KGroupedStream&lt;String, String&gt;` to
-            // `KTable&lt;String, Long&gt;` (word -> count).  We must provide a name for
-            // the resulting KTable, which will be used to name e.g. its associated
-            // state store and changelog topic.
-            .count("Counts")
-            // Convert the `KTable&lt;String, Long&gt;` into a `KStream&lt;String, Long&gt;`.
-            .toStream();
-    </pre>
-    <p>
-        WordCount example in Java 7:
-    </p>
-    <pre class="brush: java;">
-        // Code below is equivalent to the previous Java 8+ example above.
-        KStream&lt;String, String&gt; textLines = ...;
-
-        KStream&lt;String, Long&gt; wordCounts = textLines
-            .flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
-                @Override
-                public Iterable&lt;String&gt; apply(String value) {
-                    return Arrays.asList(value.toLowerCase().split("\\W+"));
-                }
-            })
-            .groupBy(new KeyValueMapper&lt;String, String, String&gt;&gt;() {
-                @Override
-                public String apply(String key, String word) {
-                    return word;
-                }
-            })
-            .count("Counts")
-            .toStream();
-    </pre>
+    <h4><a id="streams_dsl_source" href="#streams_dsl_source">Create Source Streams from Kafka</a></h4>
 
-    <h6><a id="streams_dsl_aggregations" href="#streams_dsl_aggregations">Aggregate a stream</a></h6>
     <p>
-        Once records are grouped by key via <code>groupByKey</code> or <code>groupBy</code> -- and
-        thus represented as either a <code>KGroupedStream</code> or a
-        <code>KGroupedTable</code> -- they can be aggregated via an operation such as
-        <code>reduce</code>.
-        For windowed aggregations use <code>windowedBy(Windows).reduce(Reducer)</code>.
-        Aggregations are <i>key-based</i> operations, i.e.they always operate over records (notably record values) <i>of the same key</i>.
-        You maychoose to perform aggregations on
-        <a href="#streams_dsl_windowing">windowed</a> or non-windowed data.
+        Either a <b>record stream</b> (defined as <code>KStream</code>) or a <b>changelog stream</b> (defined as <code>KTable</code> or <code>GlobalKTable</code>)
+        can be created as a source stream from one or more Kafka topics (for <code>KTable</code> and <code>GlobalKTable</code> you can only create the source stream
+        from a single topic).
     </p>
-    <table class="data-table" border="1">
-        <tbody>
-        <tr>
-            <th>Transformation</th>
-            <th>Description</th>
-        </tr>
-        <tr>
-            <td><b>Aggregate</b>: <code>KGroupedStream &rarr; KTable</code> or <code>KGroupedTable
-                &rarr; KTable</code></td>
-            <td>
-                <p>
-                    <b>Rolling aggregation</b>. Aggregates the values of (non-windowed) records by
-                    the grouped key. Aggregating is a generalization of <code>reduce</code> and allows, for example, the
-                    aggregate value to have a different type than the input values.
-                </p>
-                <p>
-                    When aggregating a grouped stream, you must provide an initializer (think:
-                    <code>aggValue = 0</code>) and an "adder"
-                    aggregator (think: <code>aggValue + curValue</code>). When aggregating a <i>grouped</i>
-                    table, you must additionally provide a "subtractor" aggregator (think: <code>aggValue - oldValue</code>).
-                </p>
-                <p>
-                    Several variants of <code>aggregate</code> exist, see Javadocs for details.
-                </p>
-                <pre class="brush: java;">
-                    KGroupedStream&lt;Bytes, String&gt; groupedStream = ...;
-                    KGroupedTable&lt;Bytes, String&gt; groupedTable = ...;
-
-                    // Java 8+ examples, using lambda expressions
-
-                    // Aggregating a KGroupedStream (note how the value type changes from String to Long)
-                    KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.aggregate(
-                        () -> 0L, /* initializer */
-                        (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
-                        Serdes.Long(), /* serde for aggregate value */
-                        "aggregated-stream-store" /* state store name */);
-
-                    // Aggregating a KGroupedTable (note how the value type changes from String to Long)
-                    KTable&lt;Bytes, Long&gt; aggregatedTable = groupedTable.aggregate(
-                        () -> 0L, /* initializer */
-                        (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
-                        (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
-                        Serdes.Long(), /* serde for aggregate value */
-                        "aggregated-table-store" /* state store name */);
-
-
-                    // windowed aggregation
-                    KTable&lt;Windowed&ltBytes&gt;, Long&gt; windowedAggregate = groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
-                        .aggregate(() -> 0L, /* initializer */
-                            (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* aggregator */
-                            Serdes.Long()) /* serde for aggregate value */
-
-
-                    // Java 7 examples
-
-                    // Aggregating a KGroupedStream (note how the value type changes from String to Long)
-                    KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.aggregate(
-                        new Initializer&lt;Long&gt;() { /* initializer */
-                          @Override
-                          public Long apply() {
-                            return 0L;
-                          }
-                        },
-                        new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
-                          @Override
-                          public Long apply(Bytes aggKey, String newValue, Long aggValue) {
-                            return aggValue + newValue.length();
-                          }
-                        },
-                        Serdes.Long(),
-                        "aggregated-stream-store");
-
-                    // Aggregating a KGroupedTable (note how the value type changes from String to Long)
-                    KTable&lt;Bytes, Long&gt; aggregatedTable = groupedTable.aggregate(
-                        new Initializer&lt;Long&gt;() { /* initializer */
-                          @Override
-                          public Long apply() {
-                            return 0L;
-                          }
-                        },
-                        new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
-                          @Override
-                          public Long apply(Bytes aggKey, String newValue, Long aggValue) {
-                            return aggValue + newValue.length();
-                          }
-                        },
-                        new Aggregator&lt;Bytes, String, Long&gt;() { /* subtractor */
-                          @Override
-                          public Long apply(Bytes aggKey, String oldValue, Long aggValue) {
-                            return aggValue - oldValue.length();
-                          }
-                        },
-                        Serdes.Long(),
-                        "aggregated-table-store");
-
-                    // Windowed aggregation
-                    KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
-                        .aggregate(
-                            new Initializer&lt;Long&gt;() { /* initializer */
-                              @Override
-                              public Long apply() {
-                                return 0L;
-                              }
-                            },
-                            new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
-                              @Override
-                              public Long apply(Bytes aggKey, String newValue, Long aggValue) {
-                                return aggValue + newValue.length();
-                              }
-                            },
-                            Serdes.Long());
-                </pre>
-                <p>
-                    Detailed behavior of <code>KGroupedStream</code>:
-                </p>
-                <ul>
-                    <li>Input records with <code>null</code> keys are ignored in general.</li>
-                    <li>When a record key is received for the first time, the initializer is called
-                        (and called before the adder).</li>
-                    <li>Whenever a record with a non-null value is received, the adder is called.</li>
-                </ul>
-                <p>
-                    Detailed behavior of KGroupedTable:
-                </p>
-                <ul>
-                    <li>Input records with null keys are ignored in general.</li>
-                    <li>When a record key is received for the first time, the initializer is called
-                        (and called before the adder and subtractor). Note that, in contrast to <code>KGroupedStream</code>, over
-                        time the initializer may be called more
-                        than once for a key as a result of having received input tombstone records
-                        for that key (see below).</li>
-                    <li>When the first non-<code>null</code> value is received for a key (think:
-                        INSERT), then only the adder is called.</li>
-                    <li>When subsequent non-<code>null</code> values are received for a key (think:
-                        UPDATE), then (1) the subtractor is called
-                        with the old value as stored in the table and (2) the adder is called with
-                        the new value of the input record
-                        that was just received. The order of execution for the subtractor and adder
-                        is not defined.</li>
-                    <li>When a tombstone record -- i.e. a record with a <code>null</code> value -- is
-                        received for a key (think: DELETE), then
-                        only the subtractor is called. Note that, whenever the subtractor returns a
-                    <code>null</code> value itself, then the
-                    corresponding key is removed from the resulting KTable. If that happens, any
-                    next input record for that key will trigger the initializer again.</li>
-                </ul>
-                <p>
-                    See the example at the bottom of this section for a visualization of the
-                    aggregation semantics.
-                </p>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Aggregate (windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
-            <td>
-                <p>
-                    <b>Windowed aggregation</b>. Aggregates the values of records, per window, by
-                    the grouped key. Aggregating is a generalization of
-                    <code>reduce</code> and allows, for example, the aggregate value to have a
-                    different type than the input values.
-                </p>
-                <p>
-                    You must provide an initializer (think: <code>aggValue = 0</code>), "adder"
-                    aggregator (think: <code>aggValue + curValue</code>),
-                    and a window. When windowing based on sessions, you must additionally provide a
-                    "session merger" aggregator (think:
-                    <code>mergedAggValue = leftAggValue + rightAggValue</code>).
-                </p>
-                <p>
-                    The windowed <code>aggregate</code> turns a <code>KGroupedStream
-                    &lt;K , V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
-                </p>
-                <p>
-                    Several variants of <code>aggregate</code> exist, see Javadocs for details.
-                </p>
-
-                <pre class="brush: java;">
-                    import java.util.concurrent.TimeUnit;
-                    KGroupedStream&lt;String, Long&gt; groupedStream = ...;
-
-                    // Java 8+ examples, using lambda expressions
-
-                    // Aggregating with time-based windowing (here: with 5-minute tumbling windows)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
-                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
-                        .aggregate(
-                            () -> 0L, /* initializer */
-                            (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
-                            Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as("time-windowed-aggregated-stream-store") /* state store name */
-                                .withValueSerde(Serdes.Long())); /* serde for aggregate value */
-
-
-                    // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream
-                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
-                        .aggregate(
-                            () -> 0L, /* initializer */
-                            (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
-                            (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */
-                            Materialized.&lt;String, Long, SessionStore&lt;Bytes, byte[]&gt;&gt;as("sessionized-aggregated-stream-store") /* state store name */
-                                .withValueSerde(Serdes.Long())); /* serde for aggregate value */
-
-                    // Java 7 examples
-
-                    // Aggregating with time-based windowing (here: with 5-minute tumbling windows)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
-                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
-                        .aggregate(
-                            new Initializer&lt;Long&gt;() { /* initializer */
-                              @Override
-                              public Long apply() {
-                                return 0L;
-                              }
-                            },
-                            new Aggregator&lt;String, Long, Long&gt;() { /* adder */
-                              @Override
-                              public Long apply(String aggKey, Long newValue, Long aggValue) {
-                                return aggValue + newValue;
-                              }
-                            },
-                            Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as("time-windowed-aggregated-stream-store") /* state store name */
-                                    .withValueSerde(Serdes.Long()) /* serde for aggregate value */
-                    );
-
-                    // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream
-                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
-                        .aggregate(
-                            new Initializer&lt;Long&gt;() { /* initializer */
-                              @Override
-                              public Long apply() {
-                                return 0L;
-                              }
-                            },
-                            new Aggregator&lt;String, Long, Long&gt;() { /* adder */
-                              @Override
-                              public Long apply(String aggKey, Long newValue, Long aggValue) {
-                                return aggValue + newValue;
-                              }
-                            },
-                            new Merger&lt;String, Long&gt;() { /* session merger */
-                              @Override
-                              public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) {
-                                return rightAggValue + leftAggValue;
-                              }
-                            },
-                            Materialized.&lt;String, Long, SessionStore&lt;Bytes, byte[]&gt;&gt;as("sessionized-aggregated-stream-store") /* state store name */
-                                .withValueSerde(Serdes.Long()) /* serde for aggregate value */
-                    );
-                </pre>
-
-                <p>
-                    Detailed behavior:
-                </p>
-                <ul>
-                    <li>The windowed aggregate behaves similar to the rolling aggregate described
-                        above. The additional twist is that the behavior applies per window.</li>
-                    <li>Input records with <code>null</code> keys are ignored in general.</li>
-                    <li>When a record key is received for the first time for a given window, the
-                        initializer is called (and called before the adder).</li>
-                    <li>Whenever a record with a non-<code>null</code> value is received for a given window, the
-                        adder is called.
-                        (Note: As a result of a known bug in Kafka 0.11.0.0, the adder is currently
-                        also called for <code>null</code> values. You can work around this, for example, by
-                        manually filtering out <code>null</code> values prior to grouping the stream.)</li>
-                    <li>When using session windows: the session merger is called whenever two
-                        sessions are being merged.</li>
-                </ul>
-                <p>
-                See the example at the bottom of this section for a visualization of the aggregation semantics.
-                </p>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Count</b>: <code>KGroupedStream &rarr; KTable or KGroupedTable &rarr; KTable</code></td>
-            <td>
-                <p>
-                    <b>Rolling aggregation</b>. Counts the number of records by the grouped key.
-                    Several variants of <code>count</code> exist, see Javadocs for details.
-                </p>
-                <pre class="brush: java;">
-                    KGroupedStream&lt;String, Long&gt; groupedStream = ...;
-                    KGroupedTable&lt;String, Long&gt; groupedTable = ...;
-
-                    // Counting a KGroupedStream
-                    KTable&lt;String, Long&gt; aggregatedStream = groupedStream.count();
-
-                    // Counting a KGroupedTable
-                    KTable&lt;String, Long&gt; aggregatedTable = groupedTable.count();
-                </pre>
-                <p>
-                    Detailed behavior for <code>KGroupedStream</code>:
-                </p>
-                <ul>
-                    <li>Input records with null keys or values are ignored.</li>
-                </ul>
-                <p>
-                    Detailed behavior for <code>KGroupedTable</code>:
-                </p>
-                <ul>
-                    <li>Input records with <code>null</code> keys are ignored. Records with <code>null</code>
-                        values are not ignored but interpreted as "tombstones" for the corresponding key, which
-                        indicate the deletion of the key from the table.</li>
-                </ul>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Count (Windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
-            <td>
-                <p>
-                    Windowed aggregation. Counts the number of records, per window, by the grouped key.
-                </p>
-                <p>
-                    The windowed <code>count</code> turns a <code>KGroupedStream<&lt;K, V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
-                </p>
-                <p>
-                    Several variants of count exist, see Javadocs for details.
-                </p>
-                <pre class="brush: java;">
-                    import java.util.concurrent.TimeUnit;
-                    KGroupedStream&lt;String, Long&gt; groupedStream = ...;
-
-                    // Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream = groupedStream
-                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
-                        .count();
-
-                    // Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream = groupedStream
-                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
-                        .count();
-                </pre>
-                <p>
-                    Detailed behavior:
-                </p>
-                <ul>
-                    <li>Input records with <code>null</code> keys or values are ignored. (Note: As a result of a known bug in Kafka 0.11.0.0,
-                        records with <code>null</code> values are not ignored yet. You can work around this, for example, by manually
-                        filtering out <code>null</code> values prior to grouping the stream.)</li>
-                </ul>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Reduce</b>: <code>KGroupedStream &rarr; KTable or KGroupedTable &rarr; KTable</code></td>
-            <td>
-                <p>
-                <b>Rolling aggregation</b>. Combines the values of (non-windowed) records by the grouped key. The current record value is
-                combined with the last reduced value, and a new reduced value is returned. The result value type cannot be changed,
-                unlike <code>aggregate</code>.
-                </p>
-
-                <p>
-                When reducing a grouped stream, you must provide an "adder" reducer (think: <code>aggValue + curValue</code>).
-                When reducing a grouped table, you must additionally provide a "subtractor" reducer (think: <code>aggValue - oldValue</code>).
-                </p>
-                <p>
-                Several variants of <code>reduce</code> exist, see Javadocs for details.
-                </p>
-                <pre class="brush: java;">
-                    KGroupedStream&lt;String, Long&gt; groupedStream = ...;
-                    KGroupedTable&lt;String, Long&gt; groupedTable = ...;
-
-                    // Java 8+ examples, using lambda expressions
-
-                    // Reducing a KGroupedStream
-                    KTable&lt;String, Long&gt; aggregatedStream = groupedStream.reduce(
-                        (aggValue, newValue) -> aggValue + newValue /* adder */
-                    );
-
-                    // Reducing a KGroupedTable
-                    KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
-                        (aggValue, newValue) -> aggValue + newValue, /* adder */
-                        (aggValue, oldValue) -> aggValue - oldValue /* subtractor */
-                    );
-
-
-                    // Java 7 examples
-
-                    // Reducing a KGroupedStream
-                    KTable&lt;String, Long&gt; aggregatedStream = groupedStream.reduce(
-                        new Reducer&lt;Long&gt;() { /* adder */
-                          @Override
-                          public Long apply(Long aggValue, Long newValue) {
-                            return aggValue + newValue;
-                          }
-                        }
-                    );
-
-                    // Reducing a KGroupedTable
-                    KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
-                        new Reducer&lt;Long&gt;() { /* adder */
-                          @Override
-                          public Long apply(Long aggValue, Long newValue) {
-                            return aggValue + newValue;
-                          }
-                        },
-                        new Reducer&lt;Long&gt;() { /* subtractor */
-                          @Override
-                          public Long apply(Long aggValue, Long oldValue) {
-                            return aggValue - oldValue;
-                          }
-                        }
-                    );
-                </pre>
-                <p>
-                    Detailed behavior for <code>KGroupedStream</code>:
-                </p>
-                <ul>
-                    <li>Input records with <code>null</code> keys are ignored in general.</li>
-                    <li>When a record key is received for the first time, then the value of that
-                        record is used as the initial aggregate value.</li>
-                    <li>Whenever a record with a non-<code>null</code> value is received, the adder is called.</li>
-                </ul>
-                <p>
-                Detailed behavior for <code>KGroupedTable</code>:
-                </p>
-                <ul>
-                    <li>Input records with null keys are ignored in general.</li>
-                    <li>When a record key is received for the first time, then the value of that
-                        record is used as the initial aggregate value.
-                        Note that, in contrast to KGroupedStream, over time this initialization step
-                        may happen more than once for a key as a
-                        result of having received input tombstone records for that key (see below).</li>
-                    <li>When the first non-<code>null</code> value is received for a key (think: INSERT), then
-                        only the adder is called.</li>
-                    <li>When subsequent non-<code>null</code> values are received for a key (think: UPDATE), then
-                        (1) the subtractor is called with the
-                        old value as stored in the table and (2) the adder is called with the new
-                        value of the input record that was just received.
-                        The order of execution for the subtractor and adder is not defined.</li>
-                    <li>When a tombstone record -- i.e. a record with a <code>null</code> value -- is received
-                        for a key (think: DELETE), then only the
-                        subtractor is called. Note that, whenever the subtractor returns a <code>null</code>
-                        value itself, then the corresponding key
-                        is removed from the resulting KTable. If that happens, any next input
-                        record for that key will re-initialize its aggregate value.</li>
-                </ul>
-                <p>
-                See the example at the bottom of this section for a visualization of the
-                aggregation semantics.
-                <p>
-            </td>
-        </tr>
-        <tr>
-            <td><b>Reduce (windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
-            <td>
-                <p>
-                Windowed aggregation. Combines the values of records, per window, by the grouped key. The current record value
-                is combined with the last reduced value, and a new reduced value is returned. Records with null key or value are
-                ignored. The result value type cannot be changed, unlike aggregate. (KGroupedStream details)
-                </p>
-                <p>
-                The windowed reduce turns a <code>KGroupedStream&lt;K, V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
-                </p>
-                <p>
-                Several variants of reduce exist, see Javadocs for details.
-                </p>
-                <pre class="brush: java;">
-                    import java.util.concurrent.TimeUnit;
-                    KGroupedStream&lt;String, Long&gt; groupedStream = ...;
-
-                    // Java 8+ examples, using lambda expressions
-
-                    // Aggregating with time-based windowing (here: with 5-minute tumbling windows)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
-                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
-                        .reduce((aggValue, newValue) -> aggValue + newValue /* adder */);
-
-                    // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionzedAggregatedStream = groupedStream
-                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
-                        .reduce((aggValue, newValue) -> aggValue + newValue); /* adder */
-
-
-                    // Java 7 examples
-
-                    // Aggregating with time-based windowing (here: with 5-minute tumbling windows)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
-                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
-                        .reduce(
-                            new Reducer&lt;Long&gt;() { /* adder */
-                              @Override
-                              public Long apply(Long aggValue, Long newValue) {
-                                return aggValue + newValue;
-                              }
-                            });
-
-                    // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
-                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
-                        .reduce(
-                            new Reducer&lt;Long&gt;() { /* adder */
-                              @Override
-                              public Long apply(Long aggValue, Long newValue) {
-                                return aggValue + newValue;
-                              }
-                        });
-                </pre>
-
-                <p>
-                Detailed behavior:
-                </p>
-                <ul>
-                    <li>The windowed reduce behaves similar to the rolling reduce described above. The additional twist is that
-                        the behavior applies per window.</li>
-                    <li>Input records with <code>null</code> keys are ignored in general.</li>
-                    <li>When a record key is received for the first time for a given window, then the value of that record is
-                    used as the initial aggregate value.</li>
-                    <li>Whenever a record with a non-<code>null</code> value is received for a given window, the adder is called. (Note: As
-                    a result of a known bug in Kafka 0.11.0.0, the adder is currently also called for <code>null</code> values. You can work
-                    around this, for example, by manually filtering out <code>null</code> values prior to grouping the stream.)</li>
-                    <li>See the example at the bottom of this section for a visualization of the aggregation semantics.</li>
-                </ul>
-            </td>
-        </tr>
-        </tbody>
-    </table>
 
-    <p>
-        <b>Example of semantics for stream aggregations</b>: A <code>KGroupedStream &rarr; KTable</code> example is shown below. The streams and the table are
-        initially empty. We use bold font in the column for "KTable <code>aggregated</code>" to highlight changed state. An entry such as <code>(hello, 1)</code>
-        denotes a record with key <code>hello</code> and value <code>1</code>. To improve the readability of the semantics table we assume that all records are
-        processed in timestamp order.
-    </p>
     <pre class="brush: java;">
-        // Key: word, value: count
-        Properties streamsProperties == ...;
-
-        // specify the default serdes so we don't need to elsewhere.
-        streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
-        StreamsConfig config = new StreamsConfig(streamsProperties);
-
-        KStream&lt;String, Integer&gt; wordCounts = ...;
+    KStreamBuilder builder = new KStreamBuilder();
 
-        KGroupedStream&lt;String, Integer&gt; groupedStream = wordCounts
-            .groupByKey();
-
-        KTable&lt;String, Integer&gt; aggregated = groupedStream.aggregate(
-            () -> 0, /* initializer */
-            (aggKey, newValue, aggValue) -> aggValue + newValue /* adder */
-        );
-    </pre>
-
-    <p>
-        <b>Impact of <a href=#streams_developer-guide_memory-management_record-cache>record caches</a></b>: For illustration purposes,
-        the column "KTable <code>aggregated</code>" below shows the table's state changes over
-        time in a very granular way. In practice, you would observe state changes in such a granular way only when record caches are
-        disabled (default: enabled). When record caches are enabled, what might happen for example is that the output results of the
-        rows with timestamps 4 and 5 would be compacted, and there would only be a single state update for the key <code>kafka</code> in the KTable
-        (here: from <code>(kafka 1)</code> directly to <code>(kafka, 3)</code>. Typically, you should only disable record caches for testing or debugging purposes
-        -- under normal circumstances it is better to leave record caches enabled.
-    </p>
-    <table class="data-table" border="1">
-        <thead>
-        <col>
-        <colgroup span="2"></colgroup>
-        <colgroup span="2"></colgroup>
-        <col>
-        <tr>
-            <th scope="col"></th>
-            <th colspan="2">KStream wordCounts</th>
-            <th colspan="2">KGroupedStream groupedStream</th>
-            <th scope="col">KTable aggregated</th>
-        </tr>
-        </thead>
-        <tbody>
-        <tr>
-            <th scope="col">Timestamp</th>
-            <th scope="col">Input record</th>
-            <th scope="col">Grouping</th>
-            <th scope="col">Initializer</th>
-            <th scope="col">Adder</th>
-            <th scope="col">State</th>
-        </tr>
-        <tr>
-            <td>1</td>
-            <td>(hello, 1)</td>
-            <td>(hello, 1)</td>
-            <td>0 (for hello)</td>
-            <td>(hello, 0 + 1)</td>
-            <td>(hello, 1)</td>
-        </tr>
-        <tr>
-            <td>2</td>
-            <td>(kafka, 1)</td>
-            <td>(kafka, 1)</td>
-            <td>0 (for kafka)</td>
-            <td>(kafka, 0 + 1)</td>
-            <td>(hello, 1), (kafka, 1)</td>
-        </tr>
-        <tr>
-            <td>3</td>
-            <td>(streams, 1)</td>
-            <td>(streams, 1)</td>
-            <td>0 (for streams)</td>
-            <td>(streams, 0 + 1)</td>
-            <td>(hello, 1), (kafka, 1), (streams, 1)</td>
-        </tr>
-        <tr>
-            <td>4</td>
-            <td>(kafka, 1)</td>
-            <td>(kafka, 1)</td>
-            <td></td>
-            <td>(kafka, 1 + 1)</td>
-            <td>(hello, 1), (kafka, 2), (streams, 1)</td>
-        </tr>
-        <tr>
-            <td>5</td>
-            <td>(kafka, 1)</td>
-            <td>(kafka, 1)</td>
-            <td></td>
-            <td>(kafka, 2 + 1)</td>
-            <td>(hello, 1), (kafka, 3), (streams, 1)</td>
-        </tr>
-        <tr>
-            <td>6</td>
-            <td>(streams, 1)</td>
-            <td>(streams, 1)</td>
-            <td></td>
-            <td>(streams, 1 + 1)</td>
-            <td>(hello, 1), (kafka, 3), (streams, 2)</td>
-        </tr>
-        </tbody>
-    </table>
-    <p>
-    Example of semantics for table aggregations: A <code>KGroupedTable &rarr; KTable</code> example is shown below. The tables are initially empty.
-    We use bold font in the column for "KTable <code>aggregated</code>" to highlight changed state. An entry such as <code>(hello, 1)</code> denotes a
-    record with key <code>hello</code> and value <code>1</code>. To improve the readability of the semantics table we assume that all records are processed
-    in timestamp order.
-    </p>
-    <pre class="brush: java;">
-        // Key: username, value: user region (abbreviated to "E" for "Europe", "A" for "Asia")
-        KTable&lt;String, String&gt; userProfiles = ...;
-
-        // Re-group `userProfiles`.  Don't read too much into what the grouping does:
-        // its prime purpose in this example is to show the *effects* of the grouping
-        // in the subsequent aggregation.
-        KGroupedTable&lt;String, Integer&gt; groupedTable = userProfiles
-            .groupBy((user, region) -> KeyValue.pair(region, user.length()), Serialized.with(Serdes.String(), Serdes.Integer()));
-
-        KTable&lt;String, Integer&gt; aggregated = groupedTable.aggregate(
-            () -> 0, /* initializer */
-            (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
-            (aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */
-            Materialized.&lt;String, Integer, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("aggregated-table-store")
-                .withKeySerde(Serdes.String() /* serde for aggregate key */)
-                .withValueSerde(Serdes.Long() /* serde for aggregate value */)
-        );
+    KStream&lt;String, GenericRecord&gt; source1 = builder.stream("topic1", "topic2");
+    KTable&lt;String, GenericRecord&gt; source2 = builder.table("topic3", "stateStoreName");
+    GlobalKTable&lt;String, GenericRecord&gt; source2 = builder.globalTable("topic4", "globalStoreName");
     </pre>
-    <p>
-        <b>Impact of <a href=#streams_developer-guide_memory-management_record-cache>record caches</a></b>:
-        For illustration purposes, the column "KTable <code>aggregated</code>" below shows
-        the table's state changes over time in a very granular way. In practice, you would observe state changes
-        in such a granular way only when record caches are disabled (default: enabled). When record caches are enabled,
-        what might happen for example is that the output results of the rows with timestamps 4 and 5 would be
-        compacted, and there would only be a single state update for the key <code>kafka</code> in the KTable
-        (here: from <code>(kafka 1)</code> directly to <code>(kafka, 3)</code>. Typically, you should only disable
-        record caches for testing or debugging purposes -- under normal circumstances it is better to leave record caches enabled.
-    </p>
-    <table class="data-table" border="1">
-        <thead>
-        <col>
-        <colgroup span="2"></colgroup>
-        <colgroup span="2"></colgroup>
-        <col>
-        <tr>
-            <th scope="col"></th>
-            <th colspan="3">KTable userProfiles</th>
-            <th colspan="3">KGroupedTable groupedTable</th>
-            <th scope="col">KTable aggregated</th>
-        </tr>
-        </thead>
-        <tbody>
-        <tr>
-            <th scope="col">Timestamp</th>
-            <th scope="col">Input record</th>
-            <th scope="col">Interpreted as</th>
-            <th scope="col">Grouping</th>
-            <th scope="col">Initializer</th>
-            <th scope="col">Adder</th>
-            <th scope="col">Subtractor</th>
-            <th scope="col">State</th>
-        </tr>
-        <tr>
-            <td>1</td>
-            <td>(alice, E)</td>
-            <td>INSERT alice</td>
-            <td>(E, 5)</td>
-            <td>0 (for E)</td>
-            <td>(E, 0 + 5)</td>
-            <td></td>
-            <td>(E, 5)</td>
-        </tr>
-        <tr>
-            <td>2</td>
-            <td>(bob, A)</td>
-            <td>INSERT bob</td>
-            <td>(A, 3)</td>
-            <td>0 (for A)</td>
-            <td>(A, 0 + 3)</td>
-            <td></td>
-            <td>(A, 3), (E, 5)</td>
-        </tr>
-        <tr>
-            <td>3</td>
-            <td>(charlie, A)</td>
-            <td>INSERT charlie</td>
-            <td>(A, 7)</td>
-            <td></td>
-            <td>(A, 3 + 7)</td>
-            <td></td>
-            <td>(A, 10), (E, 5)</td>
-        </tr>
-        <tr>
-            <td>4</td>
-            <td>(alice, A)</td>
-            <td>UPDATE alice</td>
-            <td>(A, 5)</td>
-            <td></td>
-            <td>(A, 10 + 5)</td>
-            <td>(E, 5 - 5)</td>
-            <td>(A, 15), (E, 0)</td>
-        </tr>
-        <tr>
-            <td>5</td>
-            <td>(charlie, null)</td>
-            <td>DELETE charlie</td>
-            <td>(null, 7)</td>
-            <td></td>
-            <td></td>
-            <td>(A, 15 - 7)</td>
-            <td>(A, 8), (E, 0)</td>
-        </tr>
-        <tr>
-            <td>6</td>
-            <td>(null, E)</td>
-            <td>ignored</td>
-            <td></td>
-            <td></td>
-            <td></td>
-            <td></td>
-            <td>(A, 8), (E, 0)</td>
-        </tr>
-        <tr>
-            <td>7</td>
-            <td>(bob, E)</td>
-            <td>UPDATE bob</td>
-            <td>(E, 3)</td>
-            <td></td>
-            <td>(E, 0 + 3)</td>
-            <td>(A, 8 - 3)</td>
-            <td>(A, 5), (E, 3)</td>
-        </tr>
-        </tbody>
-    </table>
 
-    <h6><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing a stream</a></h6>
+    <h4><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing a stream</a></h4>
     A stream processor may need to divide data records into time buckets, i.e. to <b>window</b> the stream by time. This is usually needed for join and aggregation operations, etc. Kafka Streams currently defines the following types of windows:
     <ul>
         <li><b>Hopping time windows</b> are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window's size and its advance interval (aka "hop"). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may b [...]
@@ -2002,7 +308,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
         Kafka Streams is able to properly handle late-arriving records.
     </p>
 
-    <h6><a id="streams_dsl_joins" href="#streams_dsl_joins">Join multiple streams</a></h6>
+    <h4><a id="streams_dsl_joins" href="#streams_dsl_joins">Join multiple streams</a></h4>
     A <b>join</b> operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely. In Kafka Streams, you may perform the following join operations:
     <ul>
         <li><b>KStream-to-KStream Joins</b> are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream's records within the specified window interval to produce one result for each matching pair based on user-provided <code>ValueJoiner</code>. A new <code>KStream</code> instance representing the result stream of the join is returned from t [...]
@@ -2020,53 +326,85 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
     Depending on the operands the following join operations are supported: <b>inner joins</b>, <b>outer joins</b> and <b>left joins</b>.
     Their <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">semantics</a> are similar to the corresponding operators in relational databases.
 
+    <h5><a id="streams_dsl_aggregations" href="#streams_dsl_aggregations">Aggregate a stream</a></h5>
+    An <b>aggregation</b> operation takes one input stream, and yields a new stream by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum. An aggregation over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the aggregation may grow indefinitely.
 
+    <p>
+        In the Kafka Streams DSL, an input stream of an aggregation can be a <code>KStream</code> or a <code>KTable</code>, but the output stream will always be a <code>KTable</code>.
+        This allows Kafka Streams to update an aggregate value upon the late arrival of further records after the value was produced and emitted.
+        When such late arrival happens, the aggregating <code>KStream</code> or <code>KTable</code> simply emits a new aggregate value. Because the output is a <code>KTable</code>, the new value is considered to overwrite the old value with the same key in subsequent processing steps.
+    </p>
 
-    <h4><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafka</a></h4>
+    <h4><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform a stream</a></h4>
 
     <p>
-        At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through
-        <code>KStream.to</code> and <code>KTable.to</code>.
+        Besides join and aggregation operations, there is a list of other transformation operations provided for <code>KStream</code> and <code>KTable</code> respectively.
+        Each of these operations may generate either one or more <code>KStream</code> and <code>KTable</code> objects and
+        can be translated into one or more connected processors into the underlying processor topology.
+        All these transformation methods can be chained together to compose a complex processor topology.
+        Since <code>KStream</code> and <code>KTable</code> are strongly typed, all these transformation operations are defined as
+        generics functions where users could specify the input and output data types.
+    </p>
+
+    <p>
+        Among these transformations, <code>filter</code>, <code>map</code>, <code>mapValues</code>, etc, are stateless
+        transformation operations and can be applied to both <code>KStream</code> and <code>KTable</code>,
+        where users can usually pass a customized function to these functions as a parameter, such as <code>Predicate</code> for <code>filter</code>,
+        <code>KeyValueMapper</code> for <code>map</code>, etc:
+
     </p>
 
     <pre class="brush: java;">
-        joined.to("topic4");
-        // or using custom Serdes and a StreamPartitioner
-        joined.to("topic5", Produced.with(keySerde, valueSerde, myStreamPartitioner));
+    // written in Java 8+, using lambda expressions
+    KStream&lt;String, GenericRecord&gt; mapped = source1.mapValue(record -> record.get("category"));
     </pre>
 
-    If your application needs to continue reading and processing the records after they have been materialized
-    to a topic via <code>to</code> above, one option is to construct a new stream that reads from the output topic;
-    Kafka Streams provides a convenience method called <code>through</code>:
+    <p>
+        Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise
+        they do not require a state store associated with the stream processor; Stateful transformations, on the other hand,
+        require accessing an associated state for processing and producing outputs.
+        For example, in <code>join</code> and <code>aggregate</code> operations, a windowing state is usually used to store all the received records
+        within the defined window boundary so far. The operators can then access these accumulated records in the store and compute
+        based on them.
+    </p>
 
     <pre class="brush: java;">
-        // equivalent to
-        //
-        // joined.to("topic4");
-        // materialized = builder.stream("topic4");
-        KStream&lt;String, String&gt; materialized = joined.through("topic4");
-        // if you need to provide serdes or a custom StreamPartitioner you can use
-        // the overloaded version
-        KStream&lt;String, String&gt; materialized = joined.through("topic5",
-                Produced.with(keySerde, valueSerde, myStreamPartitioner));
+    // written in Java 8+, using lambda expressions
+    KTable&lt;Windowed&lt;String&gt;, Long&gt; counts = source1.groupByKey().aggregate(
+    () -> 0L,  // initial value
+    (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
+    TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
+    Serdes.Long() // serde for aggregated value
+    );
+
+    KStream&lt;String, String&gt; joined = source1.leftJoin(source2,
+    (record1, record2) -> record1.get("user") + "-" + record2.get("region");
+    );
     </pre>
-    <br>
 
-    <h4><a id="streams_dsl_build" href="#streams_dsl_build">Generate the processor topology</a></h4>
+    <h4><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafka</a></h4>
 
     <p>
-        Within the Streams DSL, while users are specifying the operations to create / transform various streams as described above, a <code>Topology</code> is constructed implicitly within the <code>StreamsBuilder</code>.
-        Users can generate the constructed topology at any given point in time by calling <code>build</code>:
+        At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through
+        <code>KStream.to</code> and <code>KTable.to</code>.
     </p>
 
     <pre class="brush: java;">
-    Topology topology = builder.build();
+    joined.to("topic4");
     </pre>
 
-    <p>
-        Users can investigate the generated <code>Topology</code> via its <code>describe</code> API, and continue building or modifying the topology until they are satisfied with it.
-        The topology then can be used to execute the application (we will talk about this later in this section).
-    </p>
+    If your application needs to continue reading and processing the records after they have been materialized
+    to a topic via <code>to</code> above, one option is to construct a new stream that reads from the output topic;
+    Kafka Streams provides a convenience method called <code>through</code>:
+
+    <pre class="brush: java;">
+    // equivalent to
+    //
+    // joined.to("topic4");
+    // materialized = builder.stream("topic4");
+    KStream&lt;String, String&gt; materialized = joined.through("topic4");
+    </pre>
+    <br>
 
     <h3><a id="streams_interactive_queries" href="#streams_interactive_queries">Interactive Queries</a></h3>
     <p>
@@ -2077,7 +415,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
 
 
     <figure>
-        <img class="centered" src="/{{version}}/images/streams-interactive-queries-01.png" style="width:600pt;">
+        <img class="centerd" src="/{{version}}/images/streams-interactive-queries-01.png" style="width:600pt;">
         <figcaption style="text-align: center;"><i>Without interactive queries: increased complexity and heavier footprint of architecture</i></figcaption>
     </figure>
 
@@ -2110,7 +448,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
         </li>
     </ul>
 
-    <h4><a id="streams_developer-guide_interactive-queries_your_app" href="#streams_developer-guide_interactive-queries_your_app">Your application and interactive queries</a></h4>
+    <h4><a id="treams_developer-guide_interactive-queries_your_app" href="#treams_developer-guide_interactive-queries_your_app">Your application and interactive queries</a></h4>
     <p>
         Interactive queries allow you to tap into the <i>state</i> of your application, and notably to do that from outside your application.
         However, an application is not interactively queryable out of the box: you make it queryable by leveraging the API of Kafka Streams.
@@ -2175,7 +513,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
     </p>
 
     <figure>
-        <img class="centered" src="/{{version}}/images/streams-interactive-queries-api-01.png" style="width:500pt;">
+        <img class="centerd" src="/{{version}}/images/streams-interactive-queries-api-01.png" style="width:500pt;">
         <figcaption style="text-align: center;"><i>Every application instance can directly query any of its local state stores</i></figcaption>
     </figure>
 
@@ -2205,8 +543,6 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
         Kafka Streams materializes one state store per stream partition, which means your application will potentially manage many underlying state stores.
         The API to query local state stores enables you to query all of the underlying stores without having to know which partition the data is in.
         The objects returned from <code>KafkaStreams#store(...)</code> are therefore wrapping potentially many underlying state stores.
-        Note that it is the caller's responsibility to close the iterator on state store;
-        otherwise it may lead to OOM and leaked file handlers depending on the state store implementation.
     </p>
 
     <h4><a id="streams_developer-guide_interactive-queries_local-key-value-stores" href="#streams_developer-guide_interactive-queries_local-key-value-stores">Querying local key-value stores</a></h4>
@@ -2216,19 +552,19 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
 
     <pre class="brush: java;">
           StreamsConfig config = ...;
-          StreamsBuilder builder = ...;
+          KStreamBuilder builder = ...;
           KStream&lt;String, String&gt; textLines = ...;
 
           // Define the processing topology (here: WordCount)
           KGroupedStream&lt;String, String&gt; groupedByWord = textLines
             .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
-            .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
+            .groupBy((key, word) -> word, stringSerde, stringSerde);
 
           // Create a key-value store named "CountsKeyValueStore" for the all-time word counts
           groupedByWord.count("CountsKeyValueStore");
 
           // Start an instance of the topology
-          KafkaStreams streams = new KafkaStreams(builder.build(), config);
+          KafkaStreams streams = new KafkaStreams(builder, config);
           streams.start();
         </pre>
 
@@ -2252,7 +588,6 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
             KeyValue&lt;String, Long&gt; next = range.next();
             System.out.println("count for " + next.key + ": " + value);
           }
-          range.close(); // close iterator to avoid memory leak
 
           // Get the values for all of the keys available in this application instance
           KeyValueIterator&lt;String, Long&gt; range = keyValueStore.all();
@@ -2260,7 +595,6 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
             KeyValue&lt;String, Long&gt; next = range.next();
             System.out.println("count for " + next.key + ": " + value);
           }
-          range.close(); // close iterator to avoid memory leak
         </pre>
 
     <h4><a id="streams_developer-guide_interactive-queries_local-window-stores" href="#streams_developer-guide_interactive-queries_local-window-stores">Querying local window stores</a></h4>
@@ -2274,19 +608,16 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
 
     <pre class="brush: java;">
           StreamsConfig config = ...;
-          StreamsBuilder builder = ...;
+          KStreamBuilder builder = ...;
           KStream&lt;String, String&gt; textLines = ...;
 
           // Define the processing topology (here: WordCount)
           KGroupedStream&lt;String, String&gt; groupedByWord = textLines
             .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
-            .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
+            .groupBy((key, word) -> word, stringSerde, stringSerde);
 
           // Create a window state store named "CountsWindowStore" that contains the word counts for every minute
-          groupedByWord.windowedBy(TimeWindows.of(60000))
-            .count(Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as("CountsWindowStore")
-                withKeySerde(Serdes.String()); // count() sets value serde to Serdes.Long() automatically
-        );
+          groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore");
         </pre>
 
     <p>
@@ -2309,7 +640,6 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
             long windowTimestamp = next.key;
             System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value);
           }
-          iterator.close();
         </pre>
 
     <h4><a id="streams_developer-guide_interactive-queries_custom-stores" href="#streams_developer-guide_interactive-queries_custom-stores">Querying local custom state stores</a></h4>
@@ -2322,7 +652,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
         <li>Your custom state store must implement <code>StateStore</code>.</li>
         <li>You should have an interface to represent the operations available on the store.</li>
         <li>It is recommended that you also provide an interface that restricts access to read-only operations so users of this API can't mutate the state of your running Kafka Streams application out-of-band.</li>
-        <li>You also need to provide an implementation of <code>StoreSupplier</code> for creating instances of your store.</li>
+        <li>You also need to provide an implementation of <code>StateStoreSupplier</code> for creating instances of your store.</li>
     </ol>
 
     <p>
@@ -2344,7 +674,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
             V read(K key);
           }
 
-          public class MyCustomStoreSupplier implements StoreSupplier {
+          public class MyCustomStoreSupplier implements StateStoreSupplier {
             // implementation of the supplier for MyCustomStore
           }
         </pre>
@@ -2388,7 +718,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
         <code>StateStoreProvider#stores(String storeName, QueryableStoreType&lt;T&gt; queryableStoreType)</code> returns a <code>List</code> of state stores with the given <code>storeName</code> and of the type as defined by <code>queryableStoreType</code>.
     </p>
     <p>
-        An example implementation of the wrapper follows (Java 8+):
+        An example implemention of the wrapper follows (Java 8+):
     </p>
 
     <pre class="brush: java;">
@@ -2426,19 +756,19 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
 
     <pre class="brush: java;">
           StreamsConfig config = ...;
-          Topology topology = ...;
+          TopologyBuilder builder = ...;
           ProcessorSupplier processorSuppler = ...;
 
-          // Create CustomStoreBuilder for store name the-custom-store
-          MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder("the-custom-store");
+          // Create CustomStoreSupplier for store name the-custom-store
+          MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store");
           // Add the source topic
-          topology.addSource("input", "inputTopic");
+          builder.addSource("input", "inputTopic");
           // Add a custom processor that reads from the source topic
-          topology.addProcessor("the-processor", processorSupplier, "input");
+          builder.addProcessor("the-processor", processorSupplier, "input");
           // Connect your custom state store to the custom processor above
-          topology.addStateStore(customStoreBuilder, "the-processor");
+          builder.addStateStore(customStoreSupplier, "the-processor");
 
-          KafkaStreams streams = new KafkaStreams(topology, config);
+          KafkaStreams streams = new KafkaStreams(builder, config);
           streams.start();
 
           // Get access to the custom store
@@ -2501,20 +831,20 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
           // ... further settings may follow here ...
 
           StreamsConfig config = new StreamsConfig(props);
-          StreamsBuilder builder = new StreamsBuilder();
+          KStreamBuilder builder = new KStreamBuilder();
 
-          KStream&lt;String, String&gt; textLines = builder.stream("word-count-input", Consumed.with(stringSerde, stringSerde);
+          KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, stringSerde, "word-count-input");
 
           KGroupedStream&lt;String, String&gt; groupedByWord = textLines
               .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
-              .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
+              .groupBy((key, word) -> word, stringSerde, stringSerde);
 
           // This call to `count()` creates a state store named "word-count".
           // The state store is discoverable and can be queried interactively.
-          groupedByWord.count(Materialized.&ltString, Long,  KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("word-count"));
+          groupedByWord.count("word-count");
 
           // Start an instance of the topology
-          KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+          KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
           streams.start();
 
           // Then, create and start the actual RPC service for remote access to this
@@ -2575,7 +905,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
           Optional&lt;Long&gt; result = streams.allMetadataForStore("word-count")
               .stream()
               .map(streamsMetadata -> {
-                  // Construct the (fictitious) full endpoint URL to query the current remote application instance
+                  // Construct the (fictituous) full endpoint URL to query the current remote application instance
                   String url = "http://" + streamsMetadata.host() + ":" + streamsMetadata.port() + "/word-count/alice";
                   // Read and return the count for 'alice', if any.
                   return http.getLong(url);
@@ -2604,7 +934,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
     </p>
 
     <ol>
-        <li>Source <code>KTable</code>, i.e. <code>KTable</code> instances that are created via <code>StreamBuilder#table()</code> or <code>StreamBuilder#globalTable()</code>.</li>
+        <li>Source <code>KTable</code>, i.e. <code>KTable</code> instances that are created via <code>KStreamBuilder#table()</code> or <code>KStreamBuilder#globalTable()</code>.</li>
         <li>Aggregation <code>KTable</code>, i.e. instances of <code>KTable</code> that are created as a result of aggregations</li>
     </ol>
     <p>
@@ -2733,14 +1063,18 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
         might not want to use the unified record cache for both state store and forwarding downstream.
     </p>
     <p>
-        Following from the example first shown in section <a href="#streams_processor_statestore">State Stores</a>, to enable caching,
-        you first create a <code>StateStoreBuilder</code> and then call <code>withCachingEnabled</code> (note that caches
-        are disabled by default and there is no explicit <code>withCachingDisabled</code> call) :
+        Following from the example first shown in section <a href="#streams_processor_statestore">State Stores</a>, to enable caching, you can
+        add the <code>enableCaching</code> call (note that caches are disabled by default and there is no explicit <code>disableCaching</code>
+        call) :
     </p>
     <pre class="brush: java;">
-        KeyValueBytesStoreSupplier countSupplier = Stores.persistentKeyValueStore("Counts");
-        StateStoreBuilder&lt;KeyValueStore&lt;String, Long&gt;&gt; builder = Stores.keyValueStoreBuilder(countSupplier, Serdes.String(), Serdes.Long());
-        builder.withCachingEnabled()
+        StateStoreSupplier countStoreSupplier =
+            Stores.create("Counts")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.Long())
+                .persistent()
+                .enableCaching()
+                .build();
     </pre>
 
     <h4><a id="streams_developer-guide_memory-management_other_memory_usage" href="#streams_developer-guide_memory-management_other_memory_usage">Other memory usage</a></h4>
@@ -2804,7 +1138,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
     StreamsConfig config = new StreamsConfig(settings);
     </pre>
 
-    <h4><a id="streams_client_config" href="#streams_client_config">Producer and Consumer Configuration</a></h4>
+    <h4><a id="streams_client_config" href="#streams_clients_config">Producer and Consumer Configuration</a></h4>
     <p>
         Apart from Kafka Streams' own configuration parameters you can also specify parameters for the Kafka consumers and producers that are used internally,
         depending on the needs of your application. Similar to the Streams settings you define any such consumer and/or producer settings via <code>StreamsConfig</code>.
@@ -2848,41 +1182,6 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
         So we can avoid the situation where one instance is assigned all tasks, begins restoring/processing, only to shortly after be rebalanced, and then have to start again with half of the tasks and so on.
     </p>
 
-    <h4><a id="streams_topic_config" href="#streams_topic_config">Internal Topic Configuration</a></h4>
-    <p>
-        Kafka Streams automatically creates internal repartitioning and changelog topics.
-        You can override the default configs used when creating these topics by adding any configs from <code>TopicConfig</code> to your <code>StreamsConfig</code> with the prefix <code>StreamsConfig.TOPIC_PREFIX</code>:
-    </p>
-
-    <pre class="brush: java;">
-    Properties settings = new Properties();
-    // Example of a "normal" setting for Kafka Streams
-    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
-
-    // Add a topic config by prefixing with topic.
-    settings.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.SEGMENT_BYTES_CONFIG, 1024 * 1024);
-
-    // Alternatively, you can use
-    settings.put(StreamsConfig.topicPrefix(ConsumerConfig.SEGMENT_BYTES_CONFIG), 1024 * 1024);
-    </pre>
-
-    <p>
-        For changelog topics you can also override the default configs on a per store basis.
-        This can be done by using any method overload that has a <code>Materialized</code> as a parameter:
-    </p>
-
-    <pre class="brush: java;">
-        // a map to add topic config
-        Map&lt;String, String&gt; topicConfig = new HashMap&lt;&gt;();
-        topicConfig.put(TopicConfig.SEGMENT_MS_CONFIG, "10000");
-
-        final Materialized&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt; materialized = Materialized.as("store")
-            .withKeySerde(Serdes.String())
-            .withValueSerde(Serdes.String())
-            .withLoggingEnabled(topicConfig); // pass in the config overrides
-
-        groupedStream.count(materialized)
-    </pre>
 
     <h4><a id="streams_execute" href="#streams_execute">Executing Your Kafka Streams Application</a></h4>
     <p>
@@ -2891,35 +1190,33 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
     </p>
 
     <p>
-        First, you must create an instance of <code>KafkaStreams</code>.
-        The first argument of the <code>KafkaStreams</code> constructor takes an instance of <code>Topology</code>.
-        This topology can be either created directly following the <code>Processor</code> API or implicitly via the <code>StreamsBuilder</code> in the higher-level Streams DSL.
-        The second argument is an instance of <code>StreamsConfig</code> mentioned above.
+        First, you must create an instance of <code>KafkaStreams</code>. The first argument of the <code>KafkaStreams</code> constructor takes a topology
+        builder (either <code>KStreamBuilder</code> for the Kafka Streams DSL, or <code>TopologyBuilder</code> for the Processor API)
+        that is used to define a topology; The second argument is an instance of <code>StreamsConfig</code> mentioned above.
     </p>
 
     <pre class="brush: java;">
     import org.apache.kafka.streams.KafkaStreams;
-    import org.apache.kafka.streams.StreamsBuilder;
     import org.apache.kafka.streams.StreamsConfig;
-    import org.apache.kafka.streams.Topology;
+    import org.apache.kafka.streams.kstream.KStreamBuilder;
+    import org.apache.kafka.streams.processor.TopologyBuilder;
 
     // Use the builders to define the actual processing topology, e.g. to specify
     // from which input topics to read, which stream operations (filter, map, etc.)
     // should be called, and so on.
 
-    Topology topology = ...; // when using the Processor API
+    KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
     //
     // OR
     //
-    StreamsBuilder builder = ...;  // when using the Kafka Streams DSL
-    Topology topology = builder.build();
+    TopologyBuilder builder = ...; // when using the Processor API
 
     // Use the configuration to tell your application where the Kafka cluster is,
     // which serializers/deserializers to use by default, to specify security settings,
     // and so on.
     StreamsConfig config = ...;
 
-    KafkaStreams streams = new KafkaStreams(topology, config);
+    KafkaStreams streams = new KafkaStreams(builder, config);
     </pre>
 
     <p>
@@ -2944,16 +1241,6 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
     </pre>
 
     <p>
-        To retrieve information about the local running threads, you can use the <code>localThreadsMetadata()</code> method after you start the application.
-    </p>
-
-    <pre class="brush: java;">
-    // For instance, use this method to print/monitor the partitions assigned to each local tasks.
-    Set&lt;ThreadMetadata&gt; threads = streams.localThreadsMetadata();
-    ...
-    </pre>
-
-    <p>
         To stop the application instance call the <code>close()</code> method:
     </p>
 
@@ -3005,21 +1292,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
 $(function() {
   // Show selected style on nav item
   $('.b-nav__streams').addClass('selected');
-      
-   //sticky secondary nav
-   var $navbar = $(".sub-nav-sticky"),
-               y_pos = $navbar.offset().top,
-               height = $navbar.height();
-       
-              $(window).scroll(function() {
-                 var scrollTop = $(window).scrollTop();
-           
-                 if (scrollTop > y_pos - height) {
-                   $navbar.addClass("navbar-fixed")
-                 } else if (scrollTop <= y_pos) {
-                   $navbar.removeClass("navbar-fixed")
-                 }
-           });
+
   // Display docs subnav items
   $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
 });
diff --git a/0110/streams/index.html b/0110/streams/index.html
index d771266..fd7a62f 100644
--- a/0110/streams/index.html
+++ b/0110/streams/index.html
@@ -1,360 +1,258 @@
 <!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-   -->
-<script>
-  <!--#include virtual="../js/templateData.js" -->
-</script>
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../js/templateData.js" --></script>
+
 <script id="streams-template" type="text/x-handlebars-template">
-  <h1>Kafka Streams API</h1>
-       <div class="sub-nav-sticky">
-      <div class="sticky-top">
-        <div style="height:35px">
-          <a  class="active-menu-item" href="/{{version}}/documentation/streams/">Introduction</a>
-          <a href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
-          <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
-          <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
-          <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
+    <h1>Kafka Streams API</h1>
+
+    <h3 style="max-width: 75rem;">The easiest way to write mission-critical real-time applications and microservices with all the benefits of Kafka's server-side cluster technology.</h3>
+
+    <div class="hero">
+        <div class="hero__diagram">
+            <img src="/{{version}}/images/streams-welcome.png" />
+        </div>
+        <div class="hero__cta">
+            <a href="/{{version}}/documentation/streams/tutorial" class="btn">Write your first app</a>
+            <a href="/{{version}}/documentation/streams/quickstart" class="btn">Play with demo app</a>
         </div>
-      </div>
-     
     </div>
-       <h3 class="streams_intro">The easiest way to write mission-critical real-time applications and microservices</h3>
-       <p class="streams__description">Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.</p>
-       <div class="video__series__grid">
-          <div class="yt__video__block">
-            <div class="yt__video__inner__block">
-                <iframe  class="yt_series video_1 active" style="display:block" src="https://www.youtube.com/embed/Z3JKCLG3VP4?rel=0&showinfo=0&end=602" frameborder="0" allowfullscreen></iframe>
-                <iframe  class="yt_series video_2" src="https://www.youtube.com/embed/LxxeXI1mPKo?rel=0&showinfo=0&end=622" frameborder="0" allowfullscreen></iframe>
-                <iframe  class="yt_series video_3" src="https://www.youtube.com/embed/7JYEEx7SBuE?rel=0&showinfo=0end=557" frameborder="0" allowfullscreen></iframe>
-                <iframe  class="yt_series video_4" src="https://www.youtube.com/embed/3kJgYIkAeHs?rel=0&showinfo=0&end=564" frameborder="0" allowfullscreen></iframe>
-            </div>
+
+    <ul class="feature-list">
+        <li>Write standard Java applications</li>
+        <li>Exactly-once processing semantics</li>
+        <li>No seperate processing cluster required</li>
+        <li>Develop on Mac, Linux, Windows</li>
+        <li>Elastic, highly scalable, fault-tolerant</li>
+        <li>Deploy to containers, VMs, bare metal, cloud</li>
+        <li>Equally viable for small, medium, &amp; large use cases</li>
+        <li>Fully integrated with Kafka security</li>
+    </ul>
+
+    <div class="cards">
+        <a class="card" href="/{{version}}/documentation/streams/developer-guide">
+            <img class="card__icon" src="/{{version}}/images/icons/documentation.png" />
+            <img class="card__icon card__icon--hover" src="/{{version}}/images/icons/documentation--white.png" />
+            <span class="card__label">Developer manual</span>
+        </a>
+        <a class="card" href="/{{version}}/documentation/streams/tutorial">
+            <img class="card__icon" src="/{{version}}/images/icons/tutorials.png" />
+            <img class="card__icon card__icon--hover" src="/{{version}}/images/icons/tutorials--white.png" />
+            <span class="card__label">Tutorials</span>
+        </a>
+        <a class="card" href="/{{version}}/documentation/streams/core-concepts">
+            <img class="card__icon" src="/{{version}}/images/icons/architecture.png" />
+            <img class="card__icon card__icon--hover" src="/{{version}}/images/icons/architecture--white.png" />
+            <span class="card__label">Concepts</span>
+        </a>
+    </div>
+
+    <h3>Hello Kafka Streams</h3>
+    <p>The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale</p>
+
+    <div class="code-example">
+        <div class="btn-group">
+            <a class="selected b-java-8" data-section="java-8">Java 8+</a>
+            <a class="b-java-7" data-section="java-7">Java 7</a>
+            <a class="b-scala" data-section="scala">Scala</a>
+        </div>
+
+        <div class="code-example__snippet b-java-8 selected">
+            <pre class="brush: java;">
+                import org.apache.kafka.common.serialization.Serdes;
+                import org.apache.kafka.streams.KafkaStreams;
+                import org.apache.kafka.streams.StreamsConfig;
+                import org.apache.kafka.streams.kstream.KStream;
+                import org.apache.kafka.streams.kstream.KStreamBuilder;
+                import org.apache.kafka.streams.kstream.KTable;
+
+                import java.util.Arrays;
+                import java.util.Properties;
+
+                public class WordCountApplication {
+
+                    public static void main(final String[] args) throws Exception {
+                        Properties config = new Properties();
+                        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
+                        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
+                        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+                        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+                        KStreamBuilder builder = new KStreamBuilder();
+                        KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
+                        KTable&lt;String, Long&gt; wordCounts = textLines
+                            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
+                            .groupBy((key, word) -> word)
+                            .count("Counts");
+                        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
+
+                        KafkaStreams streams = new KafkaStreams(builder, config);
+                        streams.start();
+                    }
+
+                }
+            </pre>
+        </div>
+
+        <div class="code-example__snippet b-java-7">
+            <pre class="brush: java;">
+                import org.apache.kafka.common.serialization.Serdes;
+                import org.apache.kafka.streams.KafkaStreams;
+                import org.apache.kafka.streams.StreamsConfig;
+                import org.apache.kafka.streams.kstream.KStream;
+                import org.apache.kafka.streams.kstream.KStreamBuilder;
+                import org.apache.kafka.streams.kstream.KTable;
+                import org.apache.kafka.streams.kstream.KeyValueMapper;
+                import org.apache.kafka.streams.kstream.ValueMapper;
+
+                import java.util.Arrays;
+                import java.util.Properties;
+
+                public class WordCountApplication {
+
+                    public static void main(final String[] args) throws Exception {
+                        Properties config = new Properties();
+                        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
+                        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
+                        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+                        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+                        KStreamBuilder builder = new KStreamBuilder();
+                        KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
+                        KTable&lt;String, Long&gt; wordCounts = textLines
+                            .flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+                                @Override
+                                public Iterable&lt;String&gt; apply(String textLine) {
+                                    return Arrays.asList(textLine.toLowerCase().split("\\W+"));
+                                }
+                            })
+                            .groupBy(new KeyValueMapper&lt;String, String, String&gt;() {
+                                @Override
+                                public String apply(String key, String word) {
+                                    return word;
+                                }
+                            })
+                            .count("Counts");
+                        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
+
+                        KafkaStreams streams = new KafkaStreams(builder, config);
+                        streams.start();
+                    }
+
+                }
+            </pre>
         </div>
-        <div class="video__block">
-            <h3>TOUR OF THE STREAMS API</h3>
-            <div class="video__list">
-                <p class="video__item video_list_1 active" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_1').show();">
-                    <span class="number">1</span><span class="video__text">Intro to Streams</span>
-                </p>
-                <p class="video__item video_list_2" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_2').show();">
-                    <span class="number">2</span><span class="video__text">Creating a Streams Application</span>
-                </p>
-                <p class="video__item video_list_3" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_3').show();">
-                    <span class="number">3</span><span class="video__text">Transforming Data Pt. 1</span>
-                </p>
-                <p class="video__item video_list_4" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_4').show();">
-                    <span class="number">4</span><span class="video__text">Transforming Data Pt. 11</span>
-                </p>
-            </div>
+
+        <div class="code-example__snippet b-scala">
+            <pre class="brush: scala;">
+                import java.lang.Long
+                import java.util.Properties
+                import java.util.concurrent.TimeUnit
+
+                import org.apache.kafka.common.serialization._
+                import org.apache.kafka.streams._
+                import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
+
+                import scala.collection.JavaConverters.asJavaIterableConverter
+
+                object WordCountApplication {
+
+                    def main(args: Array[String]) {
+                        val config: Properties = {
+                            val p = new Properties()
+                            p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
+                            p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
+                            p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
+                            p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
+                            p
+                        }
+
+                        val builder: KStreamBuilder = new KStreamBuilder()
+                        val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
+                        val wordCounts: KTable[String, Long] = textLines
+                            .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
+                            .groupBy((_, word) => word)
+                            .count("Counts")
+                        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")
+
+                        val streams: KafkaStreams = new KafkaStreams(builder, config)
+                        streams.start()
+
+                        Runtime.getRuntime.addShutdownHook(new Thread(() => {
+                            streams.close(10, TimeUnit.SECONDS)
+                        }))
+                    }
+
+                }
+            </pre>
         </div>
-       </div>
-       <hr class="separator"> 
-       <div class="use-item-section">
-           <div class="use__list__sec">
-               <h3>Why you'll love using Kafka Streams!</h3>
-               <ul class="use-feature-list">
-                  <li>Elastic, highly scalable, fault-tolerant</li>
-                  <li>Deploy to containers, VMs, bare metal, cloud</li>
-                  <li>Equally viable for small, medium, &amp; large use cases</li>
-                  <li>Fully integrated with Kafka security</li>
-                  <li>Write standard Java applications</li>
-                  <li>Exactly-once processing semantics</li>
-                  <li>No seperate processing cluster required</li>
-                  <li>Develop on Mac, Linux, Windows</li>
-                  
-               </ul>
-           </div>
-           <div class="first__app__cta">
-               <a href="/{{version}}/documentation/streams/tutorial" class="first__app__btn">Write your first app</a>
-           </div>
-       </div>
-       <hr class="separator"> 
-        <h3 class="stream__text" id="streams-use-cases"><a href="#streams-use-cases">Streams API use cases</a></h3>
-         <div class="customers__grid">
-           <div class="customer__grid">
-             <div class="customer__item streams_logo_grid streams__ny__grid">
-               <a href="https://open.nytimes.com/publishing-with-apache-kafka-at-the-new-york-times-7f0e3b7d2077" target="_blank" class="grid__logo__link">
-                 <span class="grid__item__logo" style="background-image: url('/images/powered-by/NYT.jpg');"></span>
-               </a>
-               <p class="grid__item__customer__description extra__space">
-                 <a href="https://open.nytimes.com/publishing-with-apache-kafka-at-the-new-york-times-7f0e3b7d2077" target="_blank">The New York Times uses Apache Kafka </a>and the Kafka Streams API to store and distribute, in real-time, published content to the various applications and systems that make it available to the readers.
-               </p>
-             </div>
-           </div>
-           <div class="customer__grid">
-             <div class="customer__item  streams_logo_grid streams__zalando__grid">
-               <a href="https://www.confluent.io/blog/ranking-websites-real-time-apache-kafkas-streams-api/" target="_blank" class="grid__logo__link">
-                 <span class="grid__item__logo" style="background-image: url('/images/powered-by/zalando.jpg');"></span>
-               </a>
-               <p class="grid__item__customer__description extra__space">As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing
-                 <a href="https://www.confluent.io/blog/ranking-websites-real-time-apache-kafkas-streams-api/" target='blank'> event streams</a> enables our technical team to do near-real time business intelligence.
-               </p>
-           </div>
-           </div>  
-           <div class="customer__grid">
-             <div class="customer__item  streams_logo_grid streams__line__grid">
-               <a href="https://engineering.linecorp.com/en/blog/detail/80" target="_blank" class="grid__logo__link">
-                 <span class="grid__item__logo" style="background-image: url('/images/powered-by/line.svg');width:9rem"></span>
-               </a>
-                 <p class="grid__item__customer__description extra__space"><a href="https://engineering.linecorp.com/en/blog/detail/80" target="_blank">LINE uses Apache Kafka</a> as a central datahub for our services to communicate to one another. Hundreds of billions of messages are produced daily and are used to execute various business logic, threat detection, search indexing and data analysis. LINE leverages Kafka Streams to reliably transform and filter topics enabling sub topics co [...]
-             </div>
-           </div>
-           <div class="customer__grid">
-             <div class="customer__item streams_logo_grid streams__ny__grid">
-               <a href="https://medium.com/@Pinterest_Engineering/using-kafka-streams-api-for-predictive-budgeting-9f58d206c996" target="_blank" class="grid__logo__link">
-                 <span class="grid__item__logo" style="background-image: url('/images/powered-by/pinterest.png');"></span>
-               </a>
-               <p class="grid__item__customer__description">
-                   <a href="https://medium.com/@Pinterest_Engineering/using-kafka-streams-api-for-predictive-budgeting-9f58d206c996" target="_blank">Pinterest uses Apache Kafka and the Kafka Streams API</a> at large scale to power the real-time, predictive budgeting system of their advertising infrastructure. With Kafka Streams, spend predictions are more accurate than ever.
-               </p>
-             </div>
-           </div> 
-           <div class="customer__grid">
-             <div class="customer__item  streams_logo_grid streams__rabobank__grid">
-               <a href="https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/" target="_blank" class="grid__logo__link">
-                 <span class="grid__item__logo" style="background-image: url('/images/powered-by/rabobank.jpg');"></span>
-               </a>
-                 <p class="grid__item__customer__description">Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka. It is used by an increasing amount of financial processes and services, one of which is Rabo Alerts. This service alerts customers in real-time upon financial events and is <a href="https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/" target="_blank" [...]
-             </div>
-           </div>        
-           <div class="customer__grid">
-             <div class="customer__item streams_logo_grid streams__ny__grid">
-               <a href="https://speakerdeck.com/xenji/kafka-and-debezium-at-trivago-code-dot-talks-2017-edition" target="_blank" class="grid__logo__link">
-                 <span class="grid__item__logo" style="background-image: url('/images/powered-by/trivago.png');"></span>
-               </a>
-               <p class="grid__item__customer__description">
-                   Trivago is a global hotel search platform. We are focused on reshaping the way travelers search for and compare hotels, while enabling hotel advertisers to grow their businesses by providing access to a broad audience of travelers via our websites and apps. As of 2017, we offer access to approximately 1.8 million hotels and other accommodations in over 190 countries. We use Kafka, Kafka Connect, and Kafka Streams to <a href="https://speakerdeck.com/xenji/kafka-and-debe [...]
-               </p>
-             </div>
-           </div>  
-         
-       </div>
-       <h3 style="margin-top: 5.3rem;">Hello Kafka Streams</h3>
-       <p>The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale</p>
-       
-       <div class="code-example">
-           <div class="btn-group">
-               <a class="selected b-java-8" data-section="java-8">Java 8+</a>
-               <a class="b-java-7" data-section="java-7">Java 7</a>
-               <a class="b-scala" data-section="scala">Scala</a>
-           </div>
-       
-           <div class="code-example__snippet b-java-8 selected">
-               <pre class="brush: java;">
-                   import org.apache.kafka.common.serialization.Serdes;
-                   import org.apache.kafka.streams.KafkaStreams;
-                   import org.apache.kafka.streams.StreamsConfig;
-                   import org.apache.kafka.streams.kstream.KStream;
-                   import org.apache.kafka.streams.kstream.KStreamBuilder;
-                   import org.apache.kafka.streams.kstream.KTable;
-       
-                   import java.util.Arrays;
-                   import java.util.Properties;
-       
-                   public class WordCountApplication {
-       
-                       public static void main(final String[] args) throws Exception {
-                           Properties config = new Properties();
-                           config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
-                           config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
-                           config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-                           config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-       
-                           KStreamBuilder builder = new KStreamBuilder();
-                           KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
-                           KTable&lt;String, Long&gt; wordCounts = textLines
-                               .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
-                               .groupBy((key, word) -> word)
-                               .count("Counts");
-                           wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
-       
-                           KafkaStreams streams = new KafkaStreams(builder, config);
-                           streams.start();
-                       }
-       
-                   }
-               </pre>
-           </div>
-       
-           <div class="code-example__snippet b-java-7">
-               <pre class="brush: java;">
-                   import org.apache.kafka.common.serialization.Serdes;
-                   import org.apache.kafka.streams.KafkaStreams;
-                   import org.apache.kafka.streams.StreamsConfig;
-                   import org.apache.kafka.streams.kstream.KStream;
-                   import org.apache.kafka.streams.kstream.KStreamBuilder;
-                   import org.apache.kafka.streams.kstream.KTable;
-                   import org.apache.kafka.streams.kstream.KeyValueMapper;
-                   import org.apache.kafka.streams.kstream.ValueMapper;
-       
-                   import java.util.Arrays;
-                   import java.util.Properties;
-       
-                   public class WordCountApplication {
-       
-                       public static void main(final String[] args) throws Exception {
-                           Properties config = new Properties();
-                           config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
-                           config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
-                           config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-                           config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-       
-                           KStreamBuilder builder = new KStreamBuilder();
-                           KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
-                           KTable&lt;String, Long&gt; wordCounts = textLines
-                               .flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
-                                   @Override
-                                   public Iterable&lt;String&gt; apply(String textLine) {
-                                       return Arrays.asList(textLine.toLowerCase().split("\\W+"));
-                                   }
-                               })
-                               .groupBy(new KeyValueMapper&lt;String, String, String&gt;() {
-                                   @Override
-                                   public String apply(String key, String word) {
-                                       return word;
-                                   }
-                               })
-                               .count("Counts");
-                           wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
-       
-                           KafkaStreams streams = new KafkaStreams(builder, config);
-                           streams.start();
-                       }
-       
-                   }
-               </pre>
-           </div>
-       
-           <div class="code-example__snippet b-scala">
-               <pre class="brush: scala;">
-                   import java.lang.Long
-                   import java.util.Properties
-                   import java.util.concurrent.TimeUnit
-       
-                   import org.apache.kafka.common.serialization._
-                   import org.apache.kafka.streams._
-                   import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
-       
-                   import scala.collection.JavaConverters.asJavaIterableConverter
-       
-                   object WordCountApplication {
-       
-                       def main(args: Array[String]) {
-                           val config: Properties = {
-                               val p = new Properties()
-                               p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
-                               p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
-                               p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
-                               p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
-                               p
-                           }
-       
-                           val builder: KStreamBuilder = new KStreamBuilder()
-                           val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
-                           val wordCounts: KTable[String, Long] = textLines
-                               .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
-                               .groupBy((_, word) => word)
-                               .count("Counts")
-                           wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")
-       
-                           val streams: KafkaStreams = new KafkaStreams(builder, config)
-                           streams.start()
-       
-                           Runtime.getRuntime.addShutdownHook(new Thread(() => {
-                               streams.close(10, TimeUnit.SECONDS)
-                           }))
-                       }
-       
-                   }
-               </pre>
-           </div>
-       </div>
-       
-       <div class="pagination">
-           <a href="#" class="pagination__btn pagination__btn__prev pagination__btn--disabled">Previous</a>
-           <a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__next">Next</a>
-       </div>
-     
+    </div>
+    <h3 class="customer-title">See how Kafka Streams is being used</h3>
+    <div class="customer__cards">
+         <div class="customer__card">
+             <div class="customer__card__icon">
+                 <img src="/{{version}}/images/icons/rabobank.png">
+             </div>  
+             <span class="customer__card__label">Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka and Kafka Streams.
+             </span>
+             <a href="https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/">Learn More</a>
+         </div>
+         <div class="customer__card customer-right" >
+              <div class="customer__card__icon" >
+                 <img src="/{{version}}/images/icons/zalando.png">
+              </div>  
+              <span class="customer__card__label">As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing event streams enables our technical team to do near-real time business intelligence.</span>
+              <a href="https://kafka-summit.org/sessions/using-kstreams-ktables-calculate-real-time-domain-rankings/">Learn More</a>
+         </div>
+     </div>
+
+    <div class="pagination">
+        <a href="#" class="pagination__btn pagination__btn__prev pagination__btn--disabled">Previous</a>
+        <a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__next">Next</a>
+    </div>
 </script>
+
 <!--#include virtual="../../includes/_header.htm" -->
 <!--#include virtual="../../includes/_top.htm" -->
 <div class="content documentation documentation--current">
-  <!--#include virtual="../../includes/_nav.htm" -->
-  <div class="right">
-    <!--#include virtual="../../includes/_docs_banner.htm" -->
-    <ul class="breadcrumbs">
-      <li><a href="/documentation">Documentation</a>
-      </li>
-    </ul>
-    <div class="p-streams"></div>
-  </div>
+    <!--#include virtual="../../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <ul class="breadcrumbs">
+            <li><a href="/documentation">Documentation</a></li>
+        </ul>
+        <div class="p-streams"></div>
+    </div>
 </div>
 <!--#include virtual="../../includes/_footer.htm" -->
+
 <script>
-  $(function() {
-         
-         // Show selected style on nav item
-         $('.b-nav__streams').addClass('selected');
-    
-         $('.video_list_1').click(function(){    
-             $('.video_2').attr('src', $('.video_2').attr('src'));
-             $('.video_3').attr('src', $('.video_3').attr('src'));
-             $('.video_4').attr('src', $('.video_4').attr('src'));
-
-           });
-
-         $('.video_list_2').click(function(){    
-               $('.video_1').attr('src', $('.video_1').attr('src'));
-               $('.video_3').attr('src', $('.video_3').attr('src'));
-               $('.video_4').attr('src', $('.video_4').attr('src'));
-
-           });
-
-         $('.video_list_3').click(function(){    
-              $('.video_1').attr('src', $('.video_1').attr('src'));
-              $('.video_2').attr('src', $('.video_2').attr('src'));
-              $('.video_4').attr('src', $('.video_4').attr('src'));
-           });
-
-         $('.video_list_4').click(function(){    
-              $('.video_1').attr('src', $('.video_1').attr('src'));
-              $('.video_2').attr('src', $('.video_2').attr('src'));
-              $('.video_3').attr('src', $('.video_3').attr('src'));
-           });
-           
-
-          //sticky secondary nav
-          var $navbar = $(".sub-nav-sticky"),
-               y_pos = $navbar.offset().top,
-               height = $navbar.height();
-       
-           $(window).scroll(function() {
-               var scrollTop = $(window).scrollTop();
-           
-               if (scrollTop > y_pos - height) {
-                   $navbar.addClass("navbar-fixed")
-               } else if (scrollTop <= y_pos) {
-                   $navbar.removeClass("navbar-fixed")
-               }
-           });
-       
-         // Display docs subnav items
-         $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
-         // Show selected code example
-         $('.btn-group a').click(function(){
-             var targetClass = '.b-' + $(this).data().section;
-             $('.code-example__snippet, .btn-group a').removeClass('selected');
-             $(targetClass).addClass('selected');
-         });
-       });
-</script>
\ No newline at end of file
+$(function() {
+  // Show selected style on nav item
+  $('.b-nav__streams').addClass('selected');
+
+  // Display docs subnav items
+  $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+
+  // Show selected code example
+  $('.btn-group a').click(function(){
+      var targetClass = '.b-' + $(this).data().section;
+      $('.code-example__snippet, .btn-group a').removeClass('selected');
+      $(targetClass).addClass('selected');
+  });
+});
+</script>
diff --git a/0110/streams/quickstart.html b/0110/streams/quickstart.html
index 3a32f0d..977fa5f 100644
--- a/0110/streams/quickstart.html
+++ b/0110/streams/quickstart.html
@@ -17,23 +17,12 @@
 <script><!--#include virtual="../js/templateData.js" --></script>
 
 <script id="content-template" type="text/x-handlebars-template">
-  <h1>Run Streams Demo Application</h1> 
-  <div class="sub-nav-sticky">
-      <div class="sticky-top">
-        <div style="height:35px">
-          <a href="/{{version}}/documentation/streams/">Introduction</a>
-          <a href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
-          <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
-          <a class="active-menu-item" href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
-          <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
-        </div>
-      </div>
-  </div> 
-
- <p>
+  <h1>Play with a Streams Application</h1>
+
+<p>
   This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data. However, if you have already started Kafka and
-  ZooKeeper, feel free to skip the first two steps.
- </p>
+  Zookeeper, feel free to skip the first two steps.
+</p>
 
   <p>
  Kafka Streams is a client library for building mission-critical real-time applications and microservices,
@@ -328,7 +317,7 @@ Looking beyond the scope of this concrete example, what Kafka Streams is doing h
 
 <h4><a id="quickstart_streams_stop" href="#quickstart_streams_stop">Step 6: Teardown the application</a></h4>
 
-<p>You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the ZooKeeper server in order via <b>Ctrl-C</b>.</p>
+<p>You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the Zookeeper server in order via <b>Ctrl-C</b>.</p>
 
  <div class="pagination">
         <a href="/{{version}}/documentation/streams" class="pagination__btn pagination__btn__prev">Previous</a>
@@ -357,21 +346,6 @@ $(function() {
   // Show selected style on nav item
   $('.b-nav__streams').addClass('selected');
 
-     //sticky secondary nav
-          var $navbar = $(".sub-nav-sticky"),
-               y_pos = $navbar.offset().top,
-               height = $navbar.height();
-       
-           $(window).scroll(function() {
-               var scrollTop = $(window).scrollTop();
-           
-               if (scrollTop > y_pos - height) {
-                   $navbar.addClass("navbar-fixed")
-               } else if (scrollTop <= y_pos) {
-                   $navbar.removeClass("navbar-fixed")
-               }
-           });
-
   // Display docs subnav items
   $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
 });
diff --git a/0110/streams/tutorial.html b/0110/streams/tutorial.html
index f7e3fa5..78db8eb 100644
--- a/0110/streams/tutorial.html
+++ b/0110/streams/tutorial.html
@@ -17,18 +17,7 @@
 <script><!--#include virtual="../js/templateData.js" --></script>
 
 <script id="content-template" type="text/x-handlebars-template">
-    <h1>Tutorial: Write a Streams Application</h1>
-    <div class="sub-nav-sticky">
-      <div class="sticky-top">
-        <div style="height:35px">
-          <a href="/{{version}}/documentation/streams/">Introduction</a>
-          <a href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
-          <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
-          <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
-          <a class="active-menu-item" href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
-        </div>
-      </div>
-    </div> 
+    <h1>Write your own Streams Applications</h1>
 
     <p>
         In this guide we will start from scratch on setting up your own project to write a stream processing application using Kafka's Streams API.
@@ -530,21 +519,6 @@ $(function() {
   // Show selected style on nav item
   $('.b-nav__streams').addClass('selected');
 
-     //sticky secondary nav
-          var $navbar = $(".sub-nav-sticky"),
-               y_pos = $navbar.offset().top,
-               height = $navbar.height();
-       
-           $(window).scroll(function() {
-               var scrollTop = $(window).scrollTop();
-           
-               if (scrollTop > y_pos - height) {
-                   $navbar.addClass("navbar-fixed")
-               } else if (scrollTop <= y_pos) {
-                   $navbar.removeClass("navbar-fixed")
-               }
-           });
-
   // Display docs subnav items
   $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
 });
diff --git a/0110/streams/upgrade-guide.html b/0110/streams/upgrade-guide.html
index 1f5f5b9..27345a6 100644
--- a/0110/streams/upgrade-guide.html
+++ b/0110/streams/upgrade-guide.html
@@ -27,16 +27,33 @@
     </p>
 
     <p>
-        If you want to upgrade from 0.10.1.x to 0.10.2, see the <a href="/{{version}}/documentation/#upgrade_1020_streams"><b>Upgrade Section for 0.10.2</b></a>.
+        If you want to upgrade from 0.10.1.x to 0.11.0, also see the <a href="/{{version}}/upgrade/#upgrade_1020_streams"><b>Upgrade Section for 0.10.2</b></a>.
         It highlights incompatible changes you need to consider to upgrade your code and application.
-        See <a href="#streams_api_changes_0102">below</a> a complete list of 0.10.2 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+        See below a complete list of <a href="#streams_api_changes_0102">0.10.2</a> and <a href="#streams_api_changes_0110">0.11.0</a> API and semantical changes
+        that allow you to advance your application and/or simplify your code base, including the usage of new features.
     </p>
 
     <p>
-        If you want to upgrade from 0.10.0.x to 0.10.1, see the <a href="/{{version}}/documentation/#upgrade_1010_streams"><b>Upgrade Section for 0.10.1</b></a>.
-        It highlights incompatible changes you need to consider to upgrade your code and application.
-        See <a href="#streams_api_changes_0101">below</a> a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+        Upgrading from 0.10.0.x to 0.11.0.x directly is also possible.
+        Note, that a brokers must be on version 0.10.1 or higher to run a Kafka Streams application version 0.10.1 or higher.
+        See <a href="#streams_api_changes_0101">Streams API changes in 0.10.1</a>, <a href="#streams_api_changes_0102">Streams API changes in 0.10.2</a>,
+        and <a href="#streams_api_changes_0110">Streams API changes in 0.11.0</a> for a complete list of API changes.
+        Upgrading to 0.11.0.3 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+        (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+        As an alternative, an offline upgrade is also possible.
     </p>
+    <ul>
+        <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.11.0.3 </li>
+        <li> bounce each instance of your application once </li>
+        <li> prepare your newly deployed 0.11.0.3 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+        <li> bounce each instance of your application once more to complete the upgrade </li>
+    </ul>
+    <p> Upgrading from 0.10.0.x to 0.11.0.0, 0.11.0.1, or 0.11.0.2 requires an offline upgrade (rolling bounce upgrade is not supported) </p>
+    <ul>
+        <li> stop all old (0.10.0.x) application instances </li>
+        <li> update your code and swap old code and jar file with new code and new jar file </li>
+        <li> restart all new (0.11.0.0, 0.11.0.1, or 0.11.0.2) application instances </li>
+    </ul>
 
     <h3><a id="streams_api_changes_0110" href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
 
@@ -128,7 +145,7 @@
         Parameter updates in <code>StreamsConfig</code>:
     </p>
     <ul>
-        <li> parameter <code>zookeeper.connect</code> was deprecated; a Kafka Streams application does no longer interact with ZooKeeper for topic management but uses the new broker admin protocol
+        <li> parameter <code>zookeeper.connect</code> was deprecated; a Kafka Streams application does no longer interact with Zookeeper for topic management but uses the new broker admin protocol
             (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-TopicAdminSchema.1">KIP-4, Section "Topic Admin Schema"</a>) </li>
         <li> added many new parameters for metrics, security, and client configurations </li>
     </ul>
diff --git a/0110/upgrade.html b/0110/upgrade.html
index 5f4547a..6fbbb50 100644
--- a/0110/upgrade.html
+++ b/0110/upgrade.html
@@ -65,13 +65,18 @@
 </ol>
 
 <h5><a id="upgrade_1100_streams" href="#upgrade_1100_streams">Upgrading a 0.10.2 Kafka Streams Application</a></h5>
-<ul>
+ <ul>
     <li> Upgrading your Streams application from 0.10.2 to 0.11.0 does not require a broker upgrade.
-        A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
+         A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
     <li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
     <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> for more details. </li>
-</ul>
+ </ul>
 
+<h5><a id="upgrade_1103_notable" href="#upgrade_1103_notable">Notable changes in 0.11.0.3</a></h5>
+<ul>
+    <li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
+    <li> See the <a href="/{{version}}/documentation/streams/upgrade-guide.html"><b>Kafka Streams upgrade guide</b></a> for details about this new config.
+</ul>
 
 <h5><a id="upgrade_1100_notable" href="#upgrade_1100_notable">Notable changes in 0.11.0.0</a></h5>
 <ul>
@@ -177,7 +182,7 @@
   is already not possible in that case. In order to avoid the cost of down-conversion, you should ensure that consumer applications
   are upgraded to the latest 0.11.0 client. Significantly, since the old consumer has been deprecated in 0.11.0.0, it does not support
   the new message format. You must upgrade to use the new consumer to use the new message format without the cost of down-conversion.
-  Note that 0.11.0 consumers support backwards compatibility with 0.10.0 brokers and upward, so it is possible to upgrade the
+  Note that 0.11.0 consumers support backwards compability with brokers 0.10.0 brokers and upward, so it is possible to upgrade the
   clients first before the brokers. 
 </p>
 
@@ -220,7 +225,36 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.
     <li> You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
     <li> If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface was changed. </li>
     <li> If you register custom metrics, you will need to update this code, because the <code>StreamsMetric</code> interface was changed. </li>
-    <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
+    <li> See <a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
+</ul>
+
+<h5><a id="upgrade_1020_streams_from_0100" href="#upgrade_1020_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
+<ul>
+    <li> Upgrading your Streams application from 0.10.0 to 0.10.2 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.2 application can only connect to 0.10.2 or 0.10.1 brokers. </li>
+    <li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details).
+         Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+    <li> Upgrading from 0.10.0.x to 0.10.2.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+         As an alternative, an offline upgrade is also possible.
+        <ul>
+            <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.2.2 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 0.10.2.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to complete the upgrade </li>
+        </ul>
+    </li>
+    <li> Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code and new jar file </li>
+            <li> restart all new (0.10.2.0 or 0.10.2.1) application instances </li>
+        </ul>
+    </li>
+</ul>
+
+<h5><a id="upgrade_10202_notable" href="#upgrade_10202_notable">Notable changes in 0.10.2.2</a></h5>
+<ul>
+    <li> New configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
 </ul>
 
 <h5><a id="upgrade_10201_notable" href="#upgrade_10201_notable">Notable changes in 0.10.2.1</a></h5>
@@ -229,8 +263,6 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.
   </li>
 </ul>
 
-
-
 <h5><a id="upgrade_1020_notable" href="#upgrade_1020_notable">Notable changes in 0.10.2.0</a></h5>
 <ul>
     <li>The Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2 clients
@@ -301,8 +333,27 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older
 <h5><a id="upgrade_1010_streams" href="#upgrade_1010_streams">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
 <ul>
     <li> Upgrading your Streams application from 0.10.0 to 0.10.1 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.1 application can only connect to 0.10.1 brokers. </li>
-    <li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
+    <li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
          Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+<!-- TODO: add this when 0.10.1.2 is release
+    <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+         As an alternative, an offline upgrade is also possible.
+        <ul>
+            <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.1.2 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to complete the upgrade </li>
+        </ul>
+    </li>
+-->
+    <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code and new jar file </li>
+            <li> restart all new (0.10.1.0 or 0.10.1.1) application instances </li>
+        </ul>
+    </li>
 </ul>
 
 <h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>
diff --git a/0110/uses.html b/0110/uses.html
index f1c8407..bf134fc 100644
--- a/0110/uses.html
+++ b/0110/uses.html
@@ -15,7 +15,7 @@
  limitations under the License.
 -->
 
-<p> Here is a description of a few of the popular use cases for Apache Kafka&reg;.
+<p> Here is a description of a few of the popular use cases for Apache Kafka&trade;.
 For an overview of a number of these areas in action, see <a href="https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying/">this blog post</a>. </p>
 
 <h4><a id="uses_messaging" href="#uses_messaging">Messaging</a></h4>
diff --git a/downloads.html b/downloads.html
index eef7775..318417d 100644
--- a/downloads.html
+++ b/downloads.html
@@ -127,6 +127,28 @@
             For more information, please read the detailed <a href="https://www.apache.org/dist/kafka/1.0.0/RELEASE_NOTES.html">Release Notes</a>.
         </p>
 
+        <span id="0.11.0.3"></span>
+        <h3 class="download-version">0.11.0.3<a href="#0.11.0.3"><i class="fas fa-link " style="color:#053ce2"></i></a></h3>
+        <ul>
+            <li>
+                Released July 2ed, 2018
+            </li>
+            <li>
+                <a href="https://www.apache.org/dist/kafka/0.11.0.3/RELEASE_NOTES.html">Release Notes</a>
+            </li>
+            <li>
+                Source download: <a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.3/kafka-0.11.0.3-src.tgz">kafka-0.11.0.3-src.tgz</a> (<a href="https://www.apache.org/dist/kafka/0.11.0.3/kafka-0.11.0.3-src.tgz.asc">asc</a>, <a href="https://www.apache.org/dist/kafka/0.11.0.3/kafka-0.11.0.3-src.tgz.sha512">sha512</a>)
+            </li>
+            <li>
+                Binary downloads:
+                <ul>
+                    <li>Scala 2.11 &nbsp;- <a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.3/kafka_2.11-0.11.0.3.tgz">kafka_2.11-0.11.0.3.tgz</a> (<a href="https://www.apache.org/dist/kafka/0.11.0.3/kafka_2.11-0.11.0.3.tgz.asc">asc</a>, <a href="https://www.apache.org/dist/kafka/0.11.0.3/kafka_2.11-0.11.0.3.tgz.sha512">sha512</a>)</li>
+                    <li>Scala 2.12 &nbsp;- <a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.3/kafka_2.12-0.11.0.3.tgz">kafka_2.12-0.11.0.3.tgz</a> (<a href="https://www.apache.org/dist/kafka/0.11.0.3/kafka_2.12-0.11.0.3.tgz.asc">asc</a>, <a href="https://www.apache.org/dist/kafka/0.11.0.3/kafka_2.12-0.11.0.3.tgz.sha512">sha512</a>)</li>
+                </ul>
+                We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.11 is recommended).
+            </li>
+        </ul>
+
         <span id="0.11.0.2"></span>
         <h3 class="download-version">0.11.0.2<a href="#0.11.0.2"><i class="fas fa-link " style="color:#053ce2"></i></a></h3>
         <ul>


Mime
View raw message