kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/3] kafka-site git commit: Port changes from PR4017 and PR3862 to 0110
Date Thu, 12 Oct 2017 19:09:15 GMT
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/97a78e3b/0110/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/0110/streams/developer-guide.html b/0110/streams/developer-guide.html
index 15298a7..c74ca6e 100644
--- a/0110/streams/developer-guide.html
+++ b/0110/streams/developer-guide.html
@@ -18,8 +18,67 @@
 <script><!--#include virtual="../js/templateData.js" --></script>
 
 <script id="content-template" type="text/x-handlebars-template">
-    <h1>Developer Manual</h1>
+    <h1>Developer Guide for Kafka Streams API</h1>
+    
+    <p>
+        This developer guide describes how to write, configure, and execute a Kafka Streams application. There is a <a href="/{{version}}/documentation/#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library.
+    </p>
+
+    <p>
+        The computational logic of a Kafka Streams application is defined as a <a href="/{{version}}/documentation/streams/core-concepts#streams_topology">processor topology</a>. Kafka Streams provide two sets of APIs to define the processor topology, Low-Level Processor API and High-Level Streams DSL.
+    </p>
 
+    <ul class="toc">
+        <li><a href="#streams_processor">1. Low-level Processor API</a>
+            <ul>
+                <li><a href="#streams_processor_process">1.1 Processor</a>
+                <li><a href="#streams_processor_topology">1.2 Processor Topology</a>
+                <li><a href="#streams_processor_statestore">1.3 State Stores</a>
+                <li><a href="#restoration_progress">1.4 Monitoring the Restoration Progress of Fault-tolerant State Store</a>
+                <li><a href="#disable-changelogs">1.5 Enable / Disable Fault Tolerance of State Stores (Store Changelogs)</a>
+                <li><a href="#implementing-custom-state-stores">1.6 Implementing Custom State Stores</a>
+                <li><a href="#connecting-processors-and-state-stores">1.7 Connecting Processors and State Stores</a>
+                <li><a href="#streams_processor_describe">1.5 Describe a Topology</a>
+            </ul>
+        </li>
+        <li><a href="#streams_dsl">2. High-Level Streams DSL</a>
+            <ul>
+                <li><a href="#streams_duality">2.1 Duality of Streams and Tables</a>
+                <li><a href="#streams_dsl_source">2.2 Creating Source Streams from Kafka</a>
+                <li><a href="#streams_dsl_transform">2.3 Transform a stream</a>
+                <li><a href="#streams_dsl_sink">2.4 Write streams back to Kafka</a>
+                <li><a href="#streams_dsl_build">2.5 Generate the processor topology</a>
+            </ul>
+        </li>
+        <li><a href="#streams_interactive_queries">3. Interactive Queries</a>
+            <ul>
+                <li><a href="#streams_developer-guide_interactive-queries_your_app">3.1 Your application and interactive queries</a>
+                <li><a href="#streams_developer-guide_interactive-queries_local-stores">3.2 Querying local state stores (for an application instance)</a>
+                <li><a href="#streams_developer-guide_interactive-queries_local-key-value-stores">3.3 Querying local key-value stores</a>
+                <li><a href="#streams_developer-guide_interactive-queries_local-window-stores">3.4 Querying local window stores</a>
+                <li><a href="#streams_developer-guide_interactive-queries_custom-stores">3.5 Querying local custom state stores</a>
+                <li><a href="#streams_developer-guide_interactive-queries_discovery">3.6 Querying remote state stores (for the entire application)</a>
+                <li><a href="#streams_developer-guide_interactive-queries_rpc-layer">3.7 Adding an RPC layer to your application</a>
+                <li><a href="#streams_developer-guide_interactive-queries_expose-rpc">3.8 Exposing the RPC endpoints of your application</a>
+                <li><a href="#streams_developer-guide_interactive-queries_discover-app-instances-and-stores">3.9 Discovering and accessing application instances and their respective local state stores</a>
+            </ul>
+        </li>
+        <li><a href="#streams_developer-guide_memory-management">4. Memory Management</a>
+            <ul>
+                <li><a href="#streams_developer-guide_memory-management_record-cache">4.1 Record caches in the DSL</a>
+                <li><a href="#streams_developer-guide_memory-management_state-store-cache">4.2 State store caches in the Processor API</a>
+                <li><a href="#streams_developer-guide_memory-management_other_memory_usage">4.3 Other memory usage</a>
+            </ul>
+        </li>
+        <li><a href="#streams_configure_execute">5. Application Configuration and Execution</a>
+            <ul>
+                <li><a href="#streams_client_config">5.1 Producer and Consumer Configuration</a>
+                <li><a href="#streams_broker_config">5.2 Broker Configuration</a>
+                <li><a href="#streams_topic_config">5.3 Internal Topic Configuration</a>
+                <li><a href="#streams_execute">5.4 Executing Your Kafka Streams Application</a>
+            </ul>
+        </li>
+    </ul>
     <p>
         There is a <a href="/{{version}}/documentation/#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library.
         This section focuses on how to write, configure, and execute a Kafka Streams application.
@@ -35,19 +94,17 @@
     <h4><a id="streams_processor_process" href="#streams_processor_process">Processor</a></h4>
 
     <p>
-        As mentioned in the <a href="/{{version}}/documentation/streams/core-concepts"><b>Core Concepts</b></a> section, a stream processor is a node in the processor topology that represents a single processing step.
-        With the <code>Processor</code> API developers can define arbitrary stream processors that process one received record at a time, and connect these processors with
+        A <a href="/{{version}}/documentation/streams/core-concepts"><b>stream processor</b></a> is a node in the processor topology that represents a single processing step.
+        With the <code>Processor</code> API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with
         their associated state stores to compose the processor topology that represents their customized processing logic.
     </p>
 
     <p>
-        The <code>Processor</code> interface provides two main API methods:
-        <code>process</code> and <code>punctuate</code>. The <code>process</code> method is performed on each
-        of the received record; and the <code>punctuate</code> method is performed periodically based on elapsed time.
-        In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the
-        <code>init</code> method, and use the context to schedule the punctuation period (<code>context().schedule</code>), to
-        forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), to commit the current
-        processing progress (<code>context().commit</code>), etc.
+        The <code>Processor</code> interface provides the <code>process</code> method API, which is performed on each record that is received.
+        The processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the <code>init</code> method
+        and use the context to schedule a periodically called punctuation function (<code>context().schedule</code>),
+        to forward the new or modified key-value pair to downstream processors (<code>context().forward</code>),
+        to commit the current processing progress (<code>context().commit</code>), and so on.
     </p>
 
     <p>
@@ -65,8 +122,26 @@
     // keep the processor context locally because we need it in punctuate() and commit()
     this.context = context;
 
-    // call this processor's punctuate() method every 1000 milliseconds.
-    this.context.schedule(1000);
+    // schedule a punctuation method every 1000 milliseconds.
+    this.context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+        @Override
+        public void punctuate(long timestamp) {
+            KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
+
+            while (iter.hasNext()) {
+                KeyValue&lt;String, Long&gt; entry = iter.next();
+                context.forward(entry.key, entry.value.toString());
+            }
+
+            // it is the caller's responsibility to close the iterator on state store;
+            // otherwise it may lead to memory and file handlers leak depending on the
+            // underlying state store implementation.
+            iter.close();
+
+            // commit the current processing progress
+            context.commit();
+        }
+        });
 
     // retrieve the key-value store named "Counts"
     this.kvStore = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("Counts");
@@ -96,7 +171,7 @@
         context.forward(entry.key, entry.value.toString());
     }
 
-    iter.close();
+    iter.close(); // avoid OOM
     // commit the current processing progress
     context.commit();
     }
@@ -111,27 +186,28 @@
     </pre>
 
     <p>
-        In the above implementation, the following actions are performed:
+        In the previous example, the following actions are performed:
     </p>
 
     <ul>
         <li>In the <code>init</code> method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".</li>
         <li>In the <code>process</code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).</li>
-        <li>In the <code>punctuate</code> method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.</li>
+        <li>In the scheduled <code>punctuate</code> method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.</li>
+        <li>When done with the <code>KeyValueIterator&lt;String, Long&gt;</code> you <em>must</em> close the iterator, as shown above or use the try-with-resources statement.</li>
     </ul>
 
 
     <h4><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h4>
 
     <p>
-        With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology
+        With the customized processors defined in the Processor API, you can use <code>Topology</code> to build a processor topology
         by connecting these processors together:
     </p>
 
     <pre class="brush: java;">
-    TopologyBuilder builder = new TopologyBuilder();
+    Topology topology = new Topology();
 
-    builder.addSource("SOURCE", "src-topic")
+    topology.addSource("SOURCE", "src-topic")
     // add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
     .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
 
@@ -154,63 +230,257 @@
     .addSink("SINK3", "sink-topic3", "PROCESS3");
     </pre>
 
-    There are several steps in the above code to build the topology, and here is a quick walk through:
+    Here is a quick walk through of the previous code to build the topology:
 
     <ul>
-        <li>First of all a source node named "SOURCE" is added to the topology using the <code>addSource</code> method, with one Kafka topic "src-topic" fed to it.</li>
-        <li>Three processor nodes are then added using the <code>addProcessor</code> method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.</li>
-        <li>Finally three sink nodes are added to complete the topology using the <code>addSink</code> method, each piping from a different parent processor node and writing to a separate topic.</li>
+        <li>A source node (<code>"SOURCE"</code>) is added to the topology using the <code>addSource</code> method, with one Kafka topic (<code>"src-topic"</code>) fed to it.</li>
+        <li>Three processor nodes are then added using the <code>addProcessor</code> method; here the first processor is a child of the source node, but is the parent of the other two processors.</li>
+        <li>Three sink nodes are added to complete the topology using the <code>addSink</code> method, each piping from a different parent processor node and writing to a separate topic.</li>
     </ul>
 
-    <h4><a id="streams_processor_statestore" href="#streams_processor_statestore">State Stores</a></h4>
+<h4><a id="streams_processor_statestore" href="#streams_processor_statestore">State Stores</a></h4>
+
+<p>
+To make state stores fault-tolerant (e.g., to recover from machine crashes) as well as to allow for state store migration without data loss (e.g., to migrate a stateful stream task from one machine to another when elastically adding or removing capacity from your application), a state store can be <strong>continuously backed up</strong> to a Kafka topic behind the scenes. 
+We sometimes refer to this topic as the state store's associated <em>changelog topic</em> or simply its <em>changelog</em>. 
+In the case of a machine failure, for example, the state store and thus the application's state can be fully restored from its changelog. 
+You can enable or disable this backup feature for a state store, and thus its fault tolerance.
+</p>
+
+<p>
+By default, persistent <strong>key-value stores</strong> are fault-tolerant. 
+They are backed by a <a href="https://kafka.apache.org/documentation.html#compaction">compacted</a> changelog topic. 
+The purpose of compacting this topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, and to minimize recovery time if a state store needs to be restored from its changelog topic.
+</p>
+
+<p>
+Similarly, persistent <strong>window stores</strong> are fault-tolerant. 
+They are backed by a topic that uses both <em>compaction</em> and <em>deletion</em>. 
+Using deletion in addition to compaction is required for the changelog topics of window stores because of the structure of the message keys that are being sent to the changelog topics: for window stores, the message keys are composite keys that include not only the &quot;normal&quot; key but also window timestamps. 
+For such composite keys it would not be sufficient to enable just compaction in order to prevent a changelog topic from growing out of bounds. 
+With deletion enabled, old windows that have expired will be cleaned up by Kafka's log cleaner as the log segments expire. 
+The default retention setting is <code>Windows#maintainMs()</code> + 1 day. This setting can be overriden by specifying <code>StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</code> in the <code>StreamsConfig</code>.
+</p>
+
+<p>
+One additional note regarding the use of state stores.  Any time you open an <code>Iterator</code> from a state store you <em>must</em> call <code>close()</code> on the iterator
+when you are done working with it to reclaim resources.  Or you can use the iterator from within a try-with-resources statement.
+    By not closing an iterator, you may likely encounter an OOM error.
+</p>
 
-    <p>
-        Note that the <code>Processor</code> API is not limited to only accessing the current records as they arrive in the <code>process()</code> method, but can also maintain processing states
-        that keep recently arrived records to use in stateful processing operations such as windowed joins or aggregation.
-        To take advantage of these states, users can define a state store by implementing the <code>StateStore</code> interface (the Kafka Streams library also has a few extended interfaces such as <code>KeyValueStore</code>);
-        in practice, though, users usually do not need to customize such a state store from scratch but can simply use the <code>Stores</code> factory to define a state store by specifying whether it should be persistent, log-backed, etc.
-        In the following example, a persistent key-value store named "Counts" with key type <code>String</code> and value type <code>Long</code> is created.
-    </p>
 
-    <pre class="brush: java;">
-    StateStoreSupplier countStore = Stores.create("Counts")
-    .withKeys(Serdes.String())
-    .withValues(Serdes.Long())
-    .persistent()
-    .build();
-    </pre>
+<h4><a id="restoration_progress" href="#restoration_progress">Monitoring the Restoration Progress of Fault-tolerant State Stores</a></h4>
+
+<p>
+When starting up your application any fault-tolerant state stores don't need a restoration process as the persisted state is read from local disk. 
+But there could be situations when a full restore from the backing changelog topic is required (e.g., a failure wiped out the local state or your application runs in a stateless environment and persisted data is lost on re-starts).
+</p>
+
+<p>
+If you have a significant amount of data in the changelog topic, the restoration process could take a non-negligible amount of time. 
+Given that processing of new data won't start until the restoration process is completed, having a window into the progress of restoration is useful.
+</p>
+
+<p>
+In order to observe the restoration of all state stores you provide your application an instance of the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface. 
+You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> by calling the <code>KafkaStreams#setGlobalStateRestoreListener</code> method.
+</p>
 
-    <p>
-        To take advantage of these state stores, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the
-        processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
-        state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
-    </p>
+<p>
+ A basic implementation example that prints restoration status to the console:
+</p>
 
-    <pre class="brush: java;">
-    TopologyBuilder builder = new TopologyBuilder();
+<pre class="brush: java;">
+  import org.apache.kafka.common.TopicPartition;
+  import org.apache.kafka.streams.processor.StateRestoreListener;
 
-    builder.addSource("SOURCE", "src-topic")
+   public class ConsoleGlobalRestoreListerner implements StateRestoreListener {
 
-    .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
-    // add the created state store "COUNTS" associated with processor "PROCESS1"
-    .addStateStore(countStore, "PROCESS1")
-    .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
-    .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
+      @Override
+      public void onRestoreStart(final TopicPartition topicPartition,
+                                 final String storeName,
+                                 final long startingOffset,
+                                 final long endingOffset) {
 
-    // connect the state store "COUNTS" with processor "PROCESS2"
-    .connectProcessorAndStateStores("PROCESS2", "COUNTS");
+          System.out.print("Started restoration of " + storeName + " partition " + topicPartition.partition());
+          System.out.println(" total records to be restored " + (endingOffset - startingOffset));
+      }
+
+      @Override
+      public void onBatchRestored(final TopicPartition topicPartition,
+                                  final String storeName,
+                                  final long batchEndOffset,
+                                  final long numRestored) {
 
-    .addSink("SINK1", "sink-topic1", "PROCESS1")
-    .addSink("SINK2", "sink-topic2", "PROCESS2")
-    .addSink("SINK3", "sink-topic3", "PROCESS3");
-    </pre>
+          System.out.println("Restored batch " + numRestored + " for " + storeName + " partition " + topicPartition.partition());
+
+      }
+
+      @Override
+      public void onRestoreEnd(final TopicPartition topicPartition,
+                               final String storeName,
+                               final long totalRestored) {
+
+          System.out.println("Restoration complete for " + storeName + " partition " + topicPartition.partition());
+      }
+  }
+</pre>
+
+<blockquote>
+<p>
+  The <code>StateRestoreListener</code> instance is shared across all <code>org.apache.kafka.streams.processor.internals.StreamThread</code> instances and it is assumed all methods are stateless. 
+  If any stateful operations are desired, then the user will need to provide synchronization internally.
+</p>
+</blockquote>
+
+<h4> <a id="disable-changelogs" href="#disable-changelogs">Enable / Disable Fault Tolerance of State Stores (Store Changelogs)</a></h4>
+
+<p>
+    You can enable or disable fault tolerance for a state store by enabling or disabling, respectively ,the changelogging of the store through <code>StateStoreBuilder#withLoggingEnabled(Map&lt;String, String&gt;)</code>
+    and <code>StateStoreBuilder#withLoggingDisabled()</code>.
+    You can also fine-tune the associated topic’s configuration if needed.
+</p>
+
+<p>Example for disabling fault-tolerance:</p>
+
+<pre class="brush: java;">
+
+  import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
+  import org.apache.kafka.streams.processor.state.StateStoreBuilder;
+  import org.apache.kafka.streams.state.Stores;
+
+  KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
+  StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
+                                                          Serdes.String(),
+                                                          Serdes.Long())
+                                    .withLoggingDisabled(); // disable backing up the store to a changelog topic
+
+</pre>
+
+<blockquote>
+<p>If the changelog is disabled then the attached state store is no longer fault tolerant and it can't have any standby replicas</p>
+</blockquote>
+
+<p>
+   Example for enabling fault tolerance, with additional changelog-topic configuration: You can add any log config 
+   from kafka.log.LogConfig|core/src/main/scala/kafka/log/LogConfig.scala#L61. Unrecognized configs will be ignored.
+</p>
+
+<pre class="brush: java;">
+
+  import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
+  import org.apache.kafka.streams.processor.state.StateStoreBuilder;
+  import org.apache.kafka.streams.state.Stores;
+
+  Map&lt;String, String&gt; changelogConfig = new HashMap();
+  // override min.insync.replicas
+  changelogConfig.put("min.insyc.replicas", "1")
+
+  KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
+  StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
+                                                          Serdes.String(),
+                                                          Serdes.Long())
+                                    .withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings
+
+
+</pre>
+
+<h4><a id="implementing-custom-state-stores" href="#implementing-custom-state-stores">Implementing custom State Stores</a></h4>
+
+<p>
+ Apart from using the built-in state store types, you can also implement your own. 
+ The primary interface to implement for the store is <code>org.apache.kafka.streams.processor.StateStore</code>. 
+ Beyond that, Kafka Streams also has a few extended interfaces such as <code>KeyValueStore</code>.
+</p>
+
+<p>
+  In addition to the actual store, you also need to provide a &quot;factory&quot; for the store by implementing the <code>org.apache.kafka.streams.processor.state.StoreSupplier</code> interface, which Kafka Streams uses to create instances of your store.
+</p>
+
+<p>
+  You also have the option of providing a <code>org.apache.kafka.streams.processor.StateRestoreCallback</code> instance used to restore the state store from its backing changelog topic. 
+  This is done via the <code>org.apache.kafka.streams.processor.ProcessorContext#register</code> call inside the <code>StateStore#init</code> all.
+</p>
+
+<pre class="brush: java;">
+  public void init(ProcessorContext context, StateStore store) {
+     context.register(store, false, stateRestoreCallBackIntance);
+   }    
+</pre>
+
+<p>
+  There is an additional interface <code>org.apache.kafka.streams.processor.BatchingStateRestoreCallback</code> that provides bulk restoration semantics vs. the single record-at-a-time restoration semantics offered by the <code>StateRestoreCallback</code> interface.
+</p>
+
+<p>
+  Addtionally there are two abstract classes that implement <code>StateRestoreCallback</code> or <code>BatchingStateRestoreCallback</code> in conjuntion with the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface (<code>org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback</code> and <code>org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback</code> respectively) that provide the ability for the state store to recieve notification of restoration progress for that store. 
+  The <code>StateRestoreListener</code> in this case is per state store instance and is used for internal purposes such as updating config settings based on the status of the restoration process.
+</p>
+
+<h4><a id="connecting-processors-and-state-stores" href="#connecting-processors-and-state-stores">Connecting Processors and State Stores</a></h4>
+
+<p>
+Now that we have defined a processor (WordCountProcessor) and the state stores, we can now construct the processor topology by connecting these processors and state stores together by using the <code>Topology</code> instance. 
+In addition, users can add <em>source processors</em> with the specified Kafka topics to generate input data streams into the topology, and <em>sink processors</em> with the specified Kafka topics to generate output data streams out of the topology.
+</p>
+
+<pre class="brush: java;">
+       Topology topology = new Topology();
+
+      // add the source processor node that takes Kafka topic "source-topic" as input
+      topology.addSource("Source", "source-topic")
+
+      // add the WordCountProcessor node which takes the source processor as its upstream processor
+      .addProcessor("Process", () -> new WordCountProcessor(), "Source")
+
+      // add the count store associated with the WordCountProcessor processor
+      .addStateStore(countStoreSupplier, "Process")
+
+      // add the sink processor node that takes Kafka topic "sink-topic" as output
+      // and the WordCountProcessor node as its upstream processor
+      .addSink("Sink", "sink-topic", "Process");
+</pre>
+
+<p>There are several steps in the above implementation to build the topology, and here is a quick walk-through:</p>
+<ul>
+   <li>A source processor node named &quot;Source&quot; is added to the topology using the <code>addSource</code> method, with one Kafka topic &quot;source-topic&quot; fed to it.</li>
+   <li>A processor node named &quot;Process&quot; with the pre-defined <code>WordCountProcessor</code> logic is then added as the downstream processor of the &quot;Source&quot; node using the <code>addProcessor</code> method.</li>
+   <li>A predefined persistent key-value state store is created and associated with the &quot;Process&quot; node, using <code>countStoreSupplier</code>.</li>
+   <li>A sink processor node is then added to complete the topology using the <code>addSink</code> method, taking the &quot;Process&quot; node as its upstream processor and writing to a separate &quot;sink-topic&quot; Kafka topic.</li>
+</ul>
+
+<p>
+In this topology, the &quot;Process&quot; stream processor node is considered a downstream processor of the &quot;Source&quot; node, and an upstream processor of the &quot;Sink&quot; node. 
+As a result, whenever the &quot;Source&quot; node forward a newly fetched record from Kafka to its downstream &quot;Process&quot; node, <code>WordCountProcessor#process()</code> method is triggered to process the record and update the associated state store; and whenever <code>context#forward()</code> is called in the <code>Punctuator#punctuate()</code> method, the aggregate key-value pair will be sent via the &quot;Sink&quot; processor node to the Kafka topic &quot;sink-topic&quot;.
+Note that in the <code>WordCountProcessor</code> implementation, users need to refer to the same store name &quot;Counts&quot; when accessing the key-value store; otherwise an exception will be thrown at runtime, indicating that the state store cannot be found; also, if the state store itself is not associated with the processor in the <code>Topology</code> code, accessing it in the processor's <code>init()</code> method will also throw an exception at runtime, indicating the state store is not accessible from this processor.
+</p>
+
+
+    <h4><a id="streams_processor_describe" href="#streams_processor_describe">Describe a <code>Topology</code></a></h4>
+
+    <p>
+        After a <code>Topology</code> is specified, it is possible to retrieve a description of the corresponding DAG via <code>#describe()</code> that returns a <code>TopologyDescription</code>.
+        A <code>TopologyDescription</code> contains all added source, processor, and sink nodes as well as all attached stores.
+        You can access the specified input and output topic names and patterns for source and sink nodes.
+        For processor nodes, the attached stores are added to the description.
+        Additionally, all nodes have a list to all their connected successor and predecessor nodes.
+        Thus, <code>TopologyDescritpion</code> allows to retrieve the DAG structure of the specified topology.
+        <br />
+        Note that global stores are listed explicitly because they are accessible by all nodes without the need to explicitly connect them.
+        Furthermore, nodes are grouped by <code>Sub-topologies</code>, where each sub-topology is a group of processor nodes that are directly connected to each other (i.e., either by a direct connection&mdash;but not a topic&mdash;or by sharing a store).
+        During execution, each <code>Sub-topology</code> will be processed by <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one or multiple tasks</a>.
+        Thus, each <code>Sub-topology</code> describes an independent unit of works that can be executed by different threads in parallel.
+        <br />
+        Describing a <code>Topology</code> before starting your streams application with the specified topology is helpful to reason about tasks and thus maximum parallelism (we will talk about how to execute your written application later in this section).
+        It is also helpful to get insight into a <code>Topology</code> if it is not specified directly as described above but via Kafka Streams DSL (we will describe the DSL in the next section.
+    </p>
 
     In the next section we present another way to build the processor topology: the Kafka Streams DSL.
     <br>
 
     <h3><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h3>
 
-    To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code> class, which is extended from the <code>TopologyBuilder</code>.
+    To build a <code>Topology</code> using the Streams DSL, developers can apply the <code>StreamsBuilder</code> class.
     A simple example is included with the source code for Kafka in the <code>streams/examples</code> package. The rest of this section will walk
     through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source
     codes for details.
@@ -264,25 +534,1438 @@
     ("alice", 1) --> ("alice", 3)
     </pre>
 
-    If these records a KStream and the stream processing application were to sum the values it would return <code>4</code>. If these records were a KTable or GlobalKTable, the return would be <code>3</code>, since the last record would be considered as an update.
+    If the stream is defined as a KStream and the stream processing application were to sum the values it would return <code>4</code>. If the stream is defined as a KTable or GlobalKTable, the return would be <code>3</code>, since the last record would be considered as an update.
+
+    <h4><a id="streams_dsl_source" href="#streams_dsl_source">Creating Source Streams from Kafka</a></h4>
 
-    <h4><a id="streams_dsl_source" href="#streams_dsl_source">Create Source Streams from Kafka</a></h4>
+    <p>
+    You can easily read data from Kafka topics into your application. We support the following operations.
+    </p>
+    <table class="data-table" border="1">
+        <tbody><tr>
+            <th>Reading from Kafka</th>
+            <th>Description</th>
+        </tr>
+        <tr>
+            <td><b>Stream</b>: input topic(s) &rarr; <code>KStream</code></td>
+            <td>Create a <code>KStream</code> from the specified Kafka input topic(s), interpreting the data as a record stream.
+                A <code>KStream</code> represents a partitioned record stream.
+                <p>
+                    Slightly simplified, in the case of a KStream, the local KStream instance of every application instance will be populated
+                    with data from only a <b>subset</b> of the partitions of the input topic. Collectively, i.e. across all application instances,
+                    all the partitions of the input topic will be read and processed.
+                </p>
+                <pre class="brush: java;">
+                    import org.apache.kafka.common.serialization.Serdes;
+                    import org.apache.kafka.streams.StreamsBuilder;
+                    import org.apache.kafka.streams.kstream.KStream;
+
+                    StreamsBuilder builder = new StreamsBuilder();
+
+                    KStream&lt;String, Long&gt; wordCounts = builder.stream(
+                        "word-counts-input-topic" /* input topic */,
+                        Consumed.with(Serdes.String(), Serdes.Long()); // define key and value serdes
+                </pre>
+                When to provide serdes explicitly:
+                <ul>
+                    <li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
+                    <li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic(s) do not match
+                        the configured default serdes. </li>
+                </ul>
+                Several variants of <code>stream</code> exist to e.g. specify a regex pattern for input topics to read from.</td>
+        </tr>
+        <tr>
+            <td><b>Table</b>: input topic(s) &rarr; <code>KTable</code></td>
+            <td>
+                Reads the specified Kafka input topic into a <code>KTable</code>. The topic is interpreted as a changelog stream,
+                where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not <code>null</code>) or
+                as DELETE (when the value is null) for that key.
+                <p>
+                    Slightly simplified, in the case of a KTable, the local KTable instance of every application instance will be populated
+                    with data from only a subset of the partitions of the input topic. Collectively, i.e. across all application instances, all
+                    the partitions of the input topic will be read and processed.
+                </p>
+                <p>
+                You may provide an optional name for the table (more precisely, for the internal state store that backs the table).
+                When a name is provided, the table can be queryied using <a href="#streams_interactive_queries">interactive queries</a>.
+                When a name is not provided the table will not queryable and an internal name will be provided for the state store.
+                </p>
+                <pre class="brush: java;">
+                    import org.apache.kafka.common.serialization.Serdes;
+                    import org.apache.kafka.streams.StreamsBuilder;
+                    import org.apache.kafka.streams.kstream.KTable;
+
+                    StreamsBuilder builder = new StreamsBuilder();
+
+                    KTable&lt;String, Long&gt; wordCounts = builder.table(
+                        Serdes.String(), /* key serde */
+                        Serdes.Long(),   /* value serde */
+                        "word-counts-input-topic", /* input topic */
+                        "word-counts-partitioned-store" /* table/store name */);
+                </pre>
+
+                When to provide serdes explicitly:
+                <ul>
+                    <li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
+                    <li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
+                        match the configured default serdes.</li>
+                </ul>
+
+                Several variants of <code>table</code> exist to e.g. specify the <code>auto.offset.reset</code>
+                policy to be used when reading from the input topic.
+            </td>
+        <tr>
+            <td><b>Global Table</b>: input topic &rarr; <code>GlobalKTable</code></td>
+            <td>
+                Reads the specified Kafka input topic into a <code>GlobalKTable</code>. The topic is interpreted as a changelog stream, where records
+                with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not <code>null</code>) or as DELETE (when the
+                value is <code>null</code>) for that key.
+                <p>
+                    Slightly simplified, in the case of a GlobalKTable, the local GlobalKTable instance of every application instance will be
+                    populated with data from all the partitions of the input topic. In other words, when using a global table, every application
+                    instance will get its own, full copy of the topic's data.
+                </p>
+                <p>
+                You may provide an optional name for the table (more precisely, for the internal state store that backs the table).
+                When a name is provided, the table can be queryied using <a href="#streams_interactive_queries">interactive queries</a>.
+                When a name is not provided the table will not queryable and an internal name will be provided for the state store.
+                </p>
+                <pre class="brush: java;">
+                    import org.apache.kafka.common.serialization.Serdes;
+                    import org.apache.kafka.streams.StreamsBuilder;
+                    import org.apache.kafka.streams.kstream.GlobalKTable;
+
+                    StreamsBuilder builder = new StreamsBuilder();
+
+                    GlobalKTable&lt;String, Long&gt; wordCounts = builder.globalTable(
+                        Serdes.String(), /* key serde */
+                        Serdes.Long(),   /* value serde */
+                        "word-counts-input-topic", /* input topic */
+                        "word-counts-global-store" /* table/store name */);
+                </pre>
+
+                When to provide serdes explicitly:
+                <ul>
+                    <li>If you do not specify serdes explicitly, the default serdes from the configuration are used.</li>
+                    <li>You must specificy serdes explicitly if the key and/or value types of the records in the Kafka input topic do not
+                        match the configured default serdes.</li>
+                </ul>
+                Several variants of <code>globalTable</code> exist to e.g. specify explicit serdes.
+
+            </td>
+        </tbody>
+    </table>
+
+    <h4><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform a stream</a></h4>
+    <p>
+    <code>KStream</code> and <code>KTable</code> support a variety of transformation operations. Each of these operations
+    can be translated into one or more connected processors into the underlying processor topology. Since <code>KStream</code>
+    and <code>KTable</code> are strongly typed, all these transformation operations are defined as generic functions where
+    users could specify the input and output data types.
+    </p>
+    <p>
+    Some <code>KStream</code> transformations may generate one or more <code>KStream</code> objects (e.g., filter and
+    map on <code>KStream</code> generate another <code>KStream</code>, while branch on <code>KStream</code> can generate
+    multiple <code>KStream</code> instances) while some others may generate a <code>KTable</code> object (e.g., aggregation) interpreted
+    as the changelog stream to the resulted relation. This allows Kafka Streams to continuously update the computed value upon arrival
+    of late records after it has already been produced to the downstream transformation operators. As for <code>KTable</code>,
+    all its transformation operations can only generate another <code>KTable</code> (though the Kafka Streams DSL does
+    provide a special function to convert a <code>KTable</code> representation into a <code>KStream</code>, which we will
+    describe later). Nevertheless, all these transformation methods can be chained together to compose a complex processor topology.
+    </p>
+    <p>
+    We describe these transformation operations in the following subsections, categorizing them into two categories:
+    stateless and stateful transformations.
+    </p>
+    <h5><a id="streams_dsl_transformations_stateless" href="#streams_dsl_transformations_stateless">Stateless transformations</a></h5>
+    <p>
+    Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise they do not
+    require a state store associated with the stream processor.
+    </p>
+    <table class="data-table" border="1">
+        <tbody><tr>
+            <th>Transformation</th>
+            <th>Description</th>
+        </tr>
+        <tr>
+            <td><b>Branch</b>: <code>KStream &rarr; KStream</code></td>
+            <td>
+                <p>
+                Branch (or split) a <code>KStream</code> based on the supplied predicates into one or more <code>KStream</code> instances.
+                </p>
+                <p>
+                Predicates are evaluated in order. A record is placed to one and only one output stream on the first match:
+                if the n-th predicate evaluates to true, the record is placed to n-th stream. If no predicate matches,
+                the record is dropped.
+                </p>
+                <p>
+                Branching is useful, for example, to route records to different downstream topics.
+                </p>
+                <pre class="brush: java;">
+                    KStream&lt;String, Long&gt; stream = ...;
+                    KStream&lt;String, Long&gt;[] branches = stream.branch(
+                            (key, value) -> key.startsWith("A"), /* first predicate  */
+                            (key, value) -> key.startsWith("B"), /* second predicate */
+                            (key, value) -> true                 /* third predicate  */
+                    );
+                    // KStream branches[0] contains all records whose keys start with "A"
+                    // KStream branches[1] contains all records whose keys start with "B"
+                    // KStream branches[2] contains all other records
+                    // Java 7 example: cf. `filter` for how to create `Predicate` instances
+            </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Filter</b>: <code>KStream &rarr; KStream or KTable &rarr; KTable</code></td>
+            <td>
+                <p>
+                Evaluates a boolean function for each element and retains those for which the function returns true.
+                </p>
+                <pre class="brush: java;">
+                     KStream&lt;String, Long&gt; stream = ...;
+                     KTable&lt;String, Long&gt; table = ...;
+                     // A filter that selects (keeps) only positive numbers
+                     // Java 8+ example, using lambda expressions
+                     KStream&lt;String, Long&gt; onlyPositives = stream.filter((key, value) -> value > 0);
+
+                     // Java 7 example
+                     KStream&lt;String, Long&gt; onlyPositives = stream.filter(
+                       new Predicate&lt;String, Long&gt;() {
+                         @Override
+                         public boolean test(String key, Long value) {
+                           return value > 0;
+                         }
+                       });
+
+                    // A filter on a KTable that materializes the result into a StateStore
+                    table.filter((key, value) -> value != 0, Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("filtered"));
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Inverse Filter</b>: <code>KStream &rarr; KStream or KTable &rarr; KTable</code></td>
+            <td>
+                <p>
+                Evaluates a boolean function for each element and drops those for which the function returns true.
+                </p>
+                <pre class="brush: java;">
+                     KStream&lt;String, Long&gt; stream = ...;
+
+                     // An inverse filter that discards any negative numbers or zero
+                     // Java 8+ example, using lambda expressions
+                     KStream&lt;String, Long&gt; onlyPositives = stream.filterNot((key, value) -> value <= 0);
+
+                     // Java 7 example
+                     KStream&lt;String, Long&gt; onlyPositives = stream.filterNot(
+                      new Predicate&lt;String, Long&gt;() {
+                        @Override
+                        public boolean test(String key, Long value) {
+                            return value <= 0;
+                        }
+                     });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>FlatMap</b>: <code>KStream &rarr; KStream </code></td>
+            <td>
+                <p>
+                Takes one record and produces zero, one, or more records. You can modify the record keys and values,
+                including their types.
+                </p>
+
+                <p>
+                Marks the stream for data re-partitioning: Applying a grouping or a join after <code>flatMap</code> will result in
+                re-partitioning of the records. If possible use <code>flatMapValues</code> instead, which will not cause data re-partitioning.
+                </p>
+                <pre class="brush: java;">
+                     KStream&lt;Long, String> stream = ...;
+                     KStream&lt;String, Integer&gt; transformed = stream.flatMap(
+                         // Here, we generate two output records for each input record.
+                         // We also change the key and value types.
+                         // Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
+                         (key, value) -> {
+                             List&lt;KeyValue&lt;String, Integer&gt;&gt; result = new LinkedList&lt;&gt;();
+                             result.add(KeyValue.pair(value.toUpperCase(), 1000));
+                             result.add(KeyValue.pair(value.toLowerCase(), 9000));
+                             return result;
+                         }
+                     );
+                     // Java 7 example: cf. `map` for how to create `KeyValueMapper` instances
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>FlatMap (values only)</b>: <code>KStream &rarr; KStream </code></td>
+            <td>
+                <p>
+                Takes one record and produces zero, one, or more records, while retaining the key of the original record.
+                You can modify the record values and the value type.
+                </p>
+                <p>
+                <code>flatMapValues</code> is preferable to <code>flatMap</code> because it will not cause data re-partitioning. However,
+                it does not allow you to modify the key or key type like <code>flatMap</code> does.
+                </p>
+                <pre class="brush: java;">
+                   // Split a sentence into words.
+                   KStream&lt;byte[], String&gt; sentences = ...;
+                   KStream&lt;byte[], String&gt; words = sentences.flatMapValues(value -> Arrays.asList(value.split("\\s+")));
+
+                   // Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances
+               </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Foreach</b>: <code>KStream &rarr; void </code></td>
+            <td>
+                <p>
+                Terminal operation. Performs a stateless action on each record.
+                </p>
+                <p>
+                Note on processing guarantees: Any side effects of an action (such as writing to external systems)
+                are not trackable by Kafka, which means they will typically not benefit from Kafka's processing guarantees.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;String, Long&gt; stream = ...;
+
+                       // Print the contents of the KStream to the local console.
+                       // Java 8+ example, using lambda expressions
+                       stream.foreach((key, value) -> System.out.println(key + " => " + value));
+
+                       // Java 7 example
+                       stream.foreach(
+                           new ForeachAction&lt;String, Long&gt;() {
+                               @Override
+                               public void apply(String key, Long value) {
+                                 System.out.println(key + " => " + value);
+                               }
+                       });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>GroupByKey</b>: <code>KStream &rarr; KGroupedStream </code></td>
+            <td>
+                <p>
+                Groups the records by the existing key.
+                </p>
+                <p>
+                Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly
+                partitioned ("keyed") for subsequent operations.
+                </p>
+                <p>
+                <b>When to set explicit serdes</b>: Variants of <code>groupByKey</code> exist to override the configured default serdes of
+                your application, which you must do if the key and/or value types of the resulting <code>KGroupedStream</code> do
+                not match the configured default serdes.
+                </p>
+                <p>
+                <b>Note:</b>
+                Grouping vs. Windowing: A related operation is windowing, which lets you control how to "sub-group" the
+                grouped records of the same key into so-called windows for stateful operations such as windowed aggregations
+                or windowed joins.
+                </p>
+                <p>
+                Causes data re-partitioning if and only if the stream was marked for re-partitioning. <code>groupByKey</code> is
+                preferable to <code>groupBy</code> because it re-partitions data only if the stream was already marked for re-partitioning.
+                However, <code>groupByKey</code> does not allow you to modify the key or key type like <code>groupBy</code> does.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+
+                       // Group by the existing key, using the application's configured
+                       // default serdes for keys and values.
+                       KGroupedStream&lt;byte[], String&gt; groupedStream = stream.groupByKey();
+
+                       // When the key and/or value types do not match the configured
+                       // default serdes, we must explicitly specify serdes.
+                       KGroupedStream&lt;byte[], String&gt; groupedStream = stream.groupByKey(
+                           Serialized.with(
+                                Serdes.ByteArray(), /* key */
+                                Serdes.String())     /* value */
+                       );
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>GroupBy</b>: <code>KStream &rarr; KGroupedStream or KTable &rarr; KGroupedTable</code></td>
+            <td>
+                <p>
+                Groups the records by a new key, which may be of a different key type. When grouping a table,
+                you may also specify a new value and value type. groupBy is a shorthand for selectKey(...).groupByKey().
+                </p>
+                <p>
+                Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly
+                partitioned ("keyed") for subsequent operations.
+                </p>
+                <p>
+                <b>When to set explicit serdes</b>: Variants of groupBy exist to override the configured default serdes of your
+                application, which you must do if the key and/or value types of the resulting KGroupedStream or
+                KGroupedTable do not match the configured default serdes.
+                </p>
+                <p>
+                <b>Note:</b>
+                Grouping vs. Windowing: A related operation is windowing, which lets you control how to “sub-group” the
+                grouped records of the same key into so-called windows for stateful operations such as windowed aggregations
+                or windowed joins.
+                </p>
+                <p>
+                <b>Always causes data re-partitioning:</b> groupBy always causes data re-partitioning. If possible use groupByKey
+                instead, which will re-partition data only if required.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       KTable&lt;byte[], String&gt; table = ...;
+
+                       // Java 8+ examples, using lambda expressions
+
+                       // Group the stream by a new key and key type
+                       KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
+                           (key, value) -> value,
+                           Serialize.with(
+                                Serdes.String(), /* key (note: type was modified) */
+                                Serdes.String())  /* value */
+                       );
+
+                       // Group the table by a new key and key type, and also modify the value and value type.
+                       KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
+                           (key, value) -> KeyValue.pair(value, value.length()),
+                           Serialized.with(
+                               Serdes.String(), /* key (note: type was modified) */
+                               Serdes.Integer()) /* value (note: type was modified) */
+                       );
+
+
+                       // Java 7 examples
+
+                       // Group the stream by a new key and key type
+                       KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
+                           new KeyValueMapper&lt;byte[], String, String&gt;&gt;() {
+                               @Override
+                               public String apply(byte[] key, String value) {
+                                  return value;
+                               }
+                           },
+                           Serialized.with(
+                                Serdes.String(), /* key (note: type was modified) */
+                                Serdes.String())  /* value */
+                       );
+
+                       // Group the table by a new key and key type, and also modify the value and value type.
+                       KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
+                            new KeyValueMapper&lt;byte[], String, KeyValue&lt;String, Integer&gt;&gt;() {
+                            @Override
+                                public KeyValue&lt;String, Integer&gt; apply(byte[] key, String value) {
+                                   return KeyValue.pair(value, value.length());
+                                }
+                            },
+                            Serialized.with(
+                                Serdes.String(), /* key (note: type was modified) */
+                                Serdes.Integer()) /* value (note: type was modified) */
+                       );
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Map</b>: <code>KStream &rarr; KStream</code></td>
+            <td>
+                <p>
+                Takes one record and produces one record. You can modify the record key and value, including their types.
+                </p>
+
+                <p>
+                <b>Marks the stream for data re-partitioning:</b> Applying a grouping or a join after <code>flatMap</code> will result in
+                re-partitioning of the records. If possible use <code>mapValues</code> instead, which will not cause data re-partitioning.
+                </p>
+
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+
+                       // Java 8+ example, using lambda expressions
+                       // Note how we change the key and the key type (similar to `selectKey`)
+                       // as well as the value and the value type.
+                       KStream&lt;String, Integer&gt; transformed = stream.map(
+                           (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
+
+                       // Java 7 example
+                       KStream&lt;String, Integer&gt; transformed = stream.map(
+                           new KeyValueMapper&lt;byte[], String, KeyValue&lt;String, Integer&gt;&gt;() {
+                           @Override
+                           public KeyValue&lt;String, Integer&gt; apply(byte[] key, String value) {
+                               return new KeyValue&lt;&gt;(value.toLowerCase(), value.length());
+                           }
+                       });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Map (values only)</b>: <code>KStream &rarr; KStream or KTable &rarr; KTable</code></td>
+            <td>
+                <p>
+                Takes one record and produces one record, while retaining the key of the original record. You can modify
+                the record value and the value type.
+                </p>
+                <p>
+                <code>mapValues</code> is preferable to <code>map</code> because it will not cause data re-partitioning. However, it does not
+                allow you to modify the key or key type like <code>map</code> does.
+                </p>
+
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       KTable&lt;String, String&gt; table = ...;
+
+                       // Java 8+ example, using lambda expressions
+                       KStream&lt;byte[], String&gt; uppercased = stream.mapValues(value -> value.toUpperCase());
+
+                       // Java 7 example
+                       KStream&lt;byte[], String&gt; uppercased = stream.mapValues(
+                          new ValueMapper&lt;String&gt;() {
+                          @Override
+                          public String apply(String s) {
+                             return s.toUpperCase();
+                          }
+                       });
+
+                       // mapValues on a KTable and also materialize the results into a statestore
+                       table.mapValue(value -> value.toUpperCase(), Materialized.&lt;String, String, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("uppercased"));
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Print</b>: <code>KStream &rarr; void </code></td>
+            <td>
+                <p>
+                Terminal operation. Prints the records to <code>System.out</code>. See Javadocs for serde and <code>toString()</code> caveats.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       stream.print();
+                    
+                       // You can also override how and where the data is printed, i.e, to file:
+                       stream.print(Printed.toFile("stream.out"));
+
+                       // with a custom KeyValueMapper and label
+                       stream.print(Printed.toSysOut()
+                                .withLabel("my-stream")
+                                .withKeyValueMapper((key, value) -> key + " -> " + value));
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>SelectKey</b>: <code>KStream &rarr; KStream</code></td>
+            <td>
+                <p>
+                Assigns a new key, possibly of a new key type, to each record.
+                </p>
+                <p>
+                Marks the stream for data re-partitioning: Applying a grouping or a join after <code>flatMap</code> will result in
+                re-partitioning of the records.
+                </p>
+
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+
+                       // Derive a new record key from the record's value.  Note how the key type changes, too.
+                       // Java 8+ example, using lambda expressions
+                       KStream&lt;String, String&gt; rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])
+
+                       // Java 7 example
+                       KStream&lt;String, String&gt; rekeyed = stream.selectKey(
+                           new KeyValueMapper&lt;byte[], String, String&gt;() {
+                           @Override
+                           public String apply(byte[] key, String value) {
+                              return value.split(" ")[0];
+                           }
+                         });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Table to Stream</b>: <code>KTable &rarr; KStream</code></td>
+            <td>
+                <p>
+                Converts this table into a stream.
+                </p>
+                <pre class="brush: java;">
+                       KTable&lt;byte[], String> table = ...;
+
+                       // Also, a variant of `toStream` exists that allows you
+                       // to select a new key for the resulting stream.
+                       KStream&lt;byte[], String> stream = table.toStream();
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>WriteAsText</b>: <code>KStream &rarr; void </code></td>
+            <td>
+                <p>
+                Terminal operation. Write the records to a file. See Javadocs for serde and <code>toString()</code> caveats.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       stream.writeAsText("/path/to/local/output.txt");
+
+                       // Several variants of `writeAsText` exist to e.g. override the
+                       // default serdes for record keys and record values.
+                       stream.writeAsText("/path/to/local/output.txt", Serdes.ByteArray(), Serdes.String());
+                </pre>
+            </td>
+        </tr>
+        </tbody>
+    </table>
+
+
+    <h5><a id="streams_dsl_transformations_stateful" href="#streams_dsl_transformations_stateful">Stateful transformations</a></h5>
+    <h6><a id="streams_dsl_transformations_stateful_overview" href="#streams_dsl_transformations_stateful_overview">Overview</a></h6>
+    <p>
+        Stateful transformations, by definition, depend on state for processing inputs and producing outputs, and
+        hence implementation-wise they require a state store associated with the stream processor. For example,
+        in aggregating operations, a windowing state store is used to store the latest aggregation results per window;
+        in join operations, a windowing state store is used to store all the records received so far within the
+        defined window boundary.
+    </p>
+    <p>
+        Note, that state stores are fault-tolerant. In case of failure, Kafka Streams guarantees to fully restore
+        all state stores prior to resuming the processing.
+    </p>
+    <p>
+        Available stateful transformations in the DSL include:
+    <ul>
+        <li><a href=#streams_dsl_aggregations>Aggregating</a></li>
+        <li><a href="#streams_dsl_joins">Joining</a></li>
+        <li><a href="#streams_dsl_windowing">Windowing (as part of aggregations and joins)</a></li>
+        <li>Applying custom processors and transformers, which may be stateful, for Processor API integration</li>
+    </ul>
+    </p>
+    <p>
+        The following diagram shows their relationships:
+    </p>
+    <figure>
+        <img class="centered" src="/{{version}}/images/streams-stateful_operations.png" style="width:500pt;">
+        <figcaption style="text-align: center;"><i>Stateful transformations in the DSL</i></figcaption>
+    </figure>
+
+    <p>
+        We will discuss the various stateful transformations in detail in the subsequent sections. However, let's start
+        with a first example of a stateful application: the canonical WordCount algorithm.
+    </p>
+    <p>
+        WordCount example in Java 8+, using lambda expressions:
+    </p>
+    <pre class="brush: java;">
+        // We assume record values represent lines of text.  For the sake of this example, we ignore
+        // whatever may be stored in the record keys.
+        KStream&lt;String, String&gt; textLines = ...;
+
+        KStream&lt;String, Long&gt; wordCounts = textLines
+            // Split each text line, by whitespace, into words.  The text lines are the record
+            // values, i.e. we can ignore whatever data is in the record keys and thus invoke
+            // `flatMapValues` instead of the more generic `flatMap`.
+            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
+            // Group the stream by word to ensure the key of the record is the word.
+            .groupBy((key, word) -> word)
+            // Count the occurrences of each word (record key).
+            //
+            // This will change the stream type from `KGroupedStream&lt;String, String&gt;` to
+            // `KTable&lt;String, Long&gt;` (word -> count).  We must provide a name for
+            // the resulting KTable, which will be used to name e.g. its associated
+            // state store and changelog topic.
+            .count("Counts")
+            // Convert the `KTable&lt;String, Long&gt;` into a `KStream&lt;String, Long&gt;`.
+            .toStream();
+    </pre>
+    <p>
+        WordCount example in Java 7:
+    </p>
+    <pre class="brush: java;">
+        // Code below is equivalent to the previous Java 8+ example above.
+        KStream&lt;String, String&gt; textLines = ...;
+
+        KStream&lt;String, Long&gt; wordCounts = textLines
+            .flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+                @Override
+                public Iterable&lt;String&gt; apply(String value) {
+                    return Arrays.asList(value.toLowerCase().split("\\W+"));
+                }
+            })
+            .groupBy(new KeyValueMapper&lt;String, String, String&gt;&gt;() {
+                @Override
+                public String apply(String key, String word) {
+                    return word;
+                }
+            })
+            .count("Counts")
+            .toStream();
+    </pre>
 
+    <h6><a id="streams_dsl_aggregations" href="#streams_dsl_aggregations">Aggregate a stream</a></h6>
     <p>
-        Either a <b>record stream</b> (defined as <code>KStream</code>) or a <b>changelog stream</b> (defined as <code>KTable</code> or <code>GlobalKTable</code>)
-        can be created as a source stream from one or more Kafka topics (for <code>KTable</code> and <code>GlobalKTable</code> you can only create the source stream
-        from a single topic).
+        Once records are grouped by key via <code>groupByKey</code> or <code>groupBy</code> -- and
+        thus represented as either a <code>KGroupedStream</code> or a
+        <code>KGroupedTable</code> -- they can be aggregated via an operation such as
+        <code>reduce</code>.
+        For windowed aggregations use <code>windowedBy(Windows).reduce(Reducer)</code>.
+        Aggregations are <i>key-based</i> operations, i.e.they always operate over records (notably record values) <i>of the same key</i>.
+        You maychoose to perform aggregations on
+        <a href="#streams_dsl_windowing">windowed</a> or non-windowed data.
     </p>
+    <table class="data-table" border="1">
+        <tbody>
+        <tr>
+            <th>Transformation</th>
+            <th>Description</th>
+        </tr>
+        <tr>
+            <td><b>Aggregate</b>: <code>KGroupedStream &rarr; KTable</code> or <code>KGroupedTable
+                &rarr; KTable</code></td>
+            <td>
+                <p>
+                    <b>Rolling aggregation</b>. Aggregates the values of (non-windowed) records by
+                    the grouped key. Aggregating is a generalization of <code>reduce</code> and allows, for example, the
+                    aggregate value to have a different type than the input values.
+                </p>
+                <p>
+                    When aggregating a grouped stream, you must provide an initializer (think:
+                    <code>aggValue = 0</code>) and an "adder"
+                    aggregator (think: <code>aggValue + curValue</code>). When aggregating a <i>grouped</i>
+                    table, you must additionally provide a "subtractor" aggregator (think: <code>aggValue - oldValue</code>).
+                </p>
+                <p>
+                    Several variants of <code>aggregate</code> exist, see Javadocs for details.
+                </p>
+                <pre class="brush: java;">
+                    KGroupedStream&lt;Bytes, String&gt; groupedStream = ...;
+                    KGroupedTable&lt;Bytes, String&gt; groupedTable = ...;
+
+                    // Java 8+ examples, using lambda expressions
+
+                    // Aggregating a KGroupedStream (note how the value type changes from String to Long)
+                    KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.aggregate(
+                        () -> 0L, /* initializer */
+                        (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
+                        Serdes.Long(), /* serde for aggregate value */
+                        "aggregated-stream-store" /* state store name */);
+
+                    // Aggregating a KGroupedTable (note how the value type changes from String to Long)
+                    KTable&lt;Bytes, Long&gt; aggregatedTable = groupedTable.aggregate(
+                        () -> 0L, /* initializer */
+                        (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
+                        (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
+                        Serdes.Long(), /* serde for aggregate value */
+                        "aggregated-table-store" /* state store name */);
+
+
+                    // windowed aggregation
+                    KTable&lt;Windowed&ltBytes&gt;, Long&gt; windowedAggregate = groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
+                        .aggregate(() -> 0L, /* initializer */
+                            (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* aggregator */
+                            Serdes.Long()) /* serde for aggregate value */
+
+
+                    // Java 7 examples
+
+                    // Aggregating a KGroupedStream (note how the value type changes from String to Long)
+                    KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.aggregate(
+                        new Initializer&lt;Long&gt;() { /* initializer */
+                          @Override
+                          public Long apply() {
+                            return 0L;
+                          }
+                        },
+                        new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
+                          @Override
+                          public Long apply(Bytes aggKey, String newValue, Long aggValue) {
+                            return aggValue + newValue.length();
+                          }
+                        },
+                        Serdes.Long(),
+                        "aggregated-stream-store");
+
+                    // Aggregating a KGroupedTable (note how the value type changes from String to Long)
+                    KTable&lt;Bytes, Long&gt; aggregatedTable = groupedTable.aggregate(
+                        new Initializer&lt;Long&gt;() { /* initializer */
+                          @Override
+                          public Long apply() {
+                            return 0L;
+                          }
+                        },
+                        new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
+                          @Override
+                          public Long apply(Bytes aggKey, String newValue, Long aggValue) {
+                            return aggValue + newValue.length();
+                          }
+                        },
+                        new Aggregator&lt;Bytes, String, Long&gt;() { /* subtractor */
+                          @Override
+                          public Long apply(Bytes aggKey, String oldValue, Long aggValue) {
+                            return aggValue - oldValue.length();
+                          }
+                        },
+                        Serdes.Long(),
+                        "aggregated-table-store");
+
+                    // Windowed aggregation
+                    KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
+                        .aggregate(
+                            new Initializer&lt;Long&gt;() { /* initializer */
+                              @Override
+                              public Long apply() {
+                                return 0L;
+                              }
+                            },
+                            new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
+                              @Override
+                              public Long apply(Bytes aggKey, String newValue, Long aggValue) {
+                                return aggValue + newValue.length();
+                              }
+                            },
+                            Serdes.Long());
+                </pre>
+                <p>
+                    Detailed behavior of <code>KGroupedStream</code>:
+                </p>
+                <ul>
+                    <li>Input records with <code>null</code> keys are ignored in general.</li>
+                    <li>When a record key is received for the first time, the initializer is called
+                        (and called before the adder).</li>
+                    <li>Whenever a record with a non-null value is received, the adder is called.</li>
+                </ul>
+                <p>
+                    Detailed behavior of KGroupedTable:
+                </p>
+                <ul>
+                    <li>Input records with null keys are ignored in general.</li>
+                    <li>When a record key is received for the first time, the initializer is called
+                        (and called before the adder and subtractor). Note that, in contrast to <code>KGroupedStream</code>, over
+                        time the initializer may be called more
+                        than once for a key as a result of having received input tombstone records
+                        for that key (see below).</li>
+                    <li>When the first non-<code>null</code> value is received for a key (think:
+                        INSERT), then only the adder is called.</li>
+                    <li>When subsequent non-<code>null</code> values are received for a key (think:
+                        UPDATE), then (1) the subtractor is called
+                        with the old value as stored in the table and (2) the adder is called with
+                        the new value of the input record
+                        that was just received. The order of execution for the subtractor and adder
+                        is not defined.</li>
+                    <li>When a tombstone record -- i.e. a record with a <code>null</code> value -- is
+                        received for a key (think: DELETE), then
+                        only the subtractor is called. Note that, whenever the subtractor returns a
+                    <code>null</code> value itself, then the
+                    corresponding key is removed from the resulting KTable. If that happens, any
+                    next input record for that key will trigger the initializer again.</li>
+                </ul>
+                <p>
+                    See the example at the bottom of this section for a visualization of the
+                    aggregation semantics.
+                </p>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Aggregate (windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
+            <td>
+                <p>
+                    <b>Windowed aggregation</b>. Aggregates the values of records, per window, by
+                    the grouped key. Aggregating is a generalization of
+                    <code>reduce</code> and allows, for example, the aggregate value to have a
+                    different type than the input values.
+                </p>
+                <p>
+                    You must provide an initializer (think: <code>aggValue = 0</code>), "adder"
+                    aggregator (think: <code>aggValue + curValue</code>),
+                    and a window. When windowing based on sessions, you must additionally provide a
+                    "session merger" aggregator (think:
+                    <code>mergedAggValue = leftAggValue + rightAggValue</code>).
+                </p>
+                <p>
+                    The windowed <code>aggregate</code> turns a <code>KGroupedStream
+                    &lt;K , V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
+                </p>
+                <p>
+                    Several variants of <code>aggregate</code> exist, see Javadocs for details.
+                </p>
+
+                <pre class="brush: java;">
+                    import java.util.concurrent.TimeUnit;
+                    KGroupedStream&lt;String, Long&gt; groupedStream = ...;
+
+                    // Java 8+ examples, using lambda expressions
+
+                    // Aggregating with time-based windowing (here: with 5-minute tumbling windows)
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
+                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
+                        .aggregate(
+                            () -> 0L, /* initializer */
+                            (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
+                            Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as("time-windowed-aggregated-stream-store") /* state store name */
+                                .withValueSerde(Serdes.Long())); /* serde for aggregate value */
+
+
+                    // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream
+                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
+                        .aggregate(
+                            () -> 0L, /* initializer */
+                            (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
+                            (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */
+                            Materialized.&lt;String, Long, SessionStore&lt;Bytes, byte[]&gt;&gt;as("sessionized-aggregated-stream-store") /* state store name */
+                                .withValueSerde(Serdes.Long())); /* serde for aggregate value */
+
+                    // Java 7 examples
+
+                    // Aggregating with time-based windowing (here: with 5-minute tumbling windows)
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream
+                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
+                        .aggregate(
+                            new Initializer&lt;Long&gt;() { /* initializer */
+                              @Override
+                              public Long apply() {
+                                return 0L;
+                              }
+                            },
+                            new Aggregator&lt;String, Long, Long&gt;() { /* adder */
+                              @Override
+                              public Long apply(String aggKey, Long newValue, Long aggValue) {
+                                return aggValue + newValue;
+                              }
+                            },
+                            Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as("time-windowed-aggregated-stream-store") /* state store name */
+                                    .withValueSerde(Serdes.Long()) /* serde for aggregate value */
+                    );
+
+                    // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream
+                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
+                        .aggregate(
+                            new Initializer&lt;Long&gt;() { /* initializer */
+                              @Override
+                              public Long apply() {
+                                return 0L;
+                              }
+                            },
+                            new Aggregator&lt;String, Long, Long&gt;() { /* adder */
+                              @Override
+                              public Long apply(String aggKey, Long newValue, Long aggValue) {
+                                return aggValue + newValue;
+                              }
+                            },
+                            new Merger&lt;String, Long&gt;() { /* session merger */
+                              @Override
+                              public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) {
+                                return rightAggValue + leftAggValue;
+                              }
+                            },
+                            Materialized.&lt;String, Long, SessionStore&lt;Bytes, byte[]&gt;&gt;as("sessionized-aggregated-stream-store") /* state store name */
+                                .withValueSerde(Serdes.Long()) /* serde for aggregate value */
+                    );
+                </pre>
+
+                <p>
+                    Detailed behavior:
+                </p>
+                <ul>
+                    <li>The windowed aggregate behaves similar to the rolling aggregate described
+                        above. The additional twist is that the behavior applies per window.</li>
+                    <li>Input records with <code>null</code> keys are ignored in general.</li>
+                    <li>When a record key is received for the first time for a given window, the
+                        initializer is called (and called before the adder).</li>
+                    <li>Whenever a record with a non-<code>null</code> value is received for a given window, the
+                        adder is called.
+                        (Note: As a result of a known bug in Kafka 0.11.0.0, the adder is currently
+                        also called for <code>null</code> values. You can work around this, for example, by
+                        manually filtering out <code>null</code> values prior to grouping the stream.)</li>
+                    <li>When using session windows: the session merger is called whenever two
+                        sessions are being merged.</li>
+                </ul>
+                <p>
+                See the example at the bottom of this section for a visualization of the aggregation semantics.
+                </p>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Count</b>: <code>KGroupedStream &rarr; KTable or KGroupedTable &rarr; KTable</code></td>
+            <td>
+                <p>
+                    <b>Rolling aggregation</b>. Counts the number of records by the grouped key.
+                    Several variants of <code>count</code> exist, see Javadocs for details.
+                </p>
+                <pre class="brush: java;">
+                    KGroupedStream&lt;String, Long&gt; groupedStream = ...;
+                    KGroupedTable&lt;String, Long&gt; groupedTable = ...;
+
+                    // Counting a KGroupedStream
+                    KTable&lt;String, Long&gt; aggregatedStream = groupedStream.count();
+
+                    // Counting a KGroupedTable
+                    KTable&lt;String, Long&gt; aggregatedTable = groupedTable.count();
+                </pre>
+                <p>
+                    Detailed behavior for <code>KGroupedStream</code>:
+                </p>
+                <ul>
+                    <li>Input records with null keys or values are ignored.</li>
+                </ul>
+                <p>
+                    Detailed behavior for <code>KGroupedTable</code>:
+                </p>
+                <ul>
+                    <li>Input records with <code>null</code> keys are ignored. Records with <code>null</code>
+                        values are not ignored but interpreted as "tombstones" for the corresponding key, which
+                        indicate the deletion of the key from the table.</li>
+                </ul>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Count (Windowed)</b>: <code>KGroupedStream &rarr; KTable</code></td>
+            <td>
+                <p>
+                    Windowed aggregation. Counts the number of records, per window, by the grouped key.
+                </p>
+                <p>
+                    The windowed <code>count</code> turns a <code>KGroupedStream<&lt;K, V&gt;</code> into a windowed <code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
+                </p>
+                <p>
+                    Several variants of count exist, see Javadocs for d

<TRUNCATED>

Mime
View raw message