kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Update Streams docs: quickstart and concepts
Date Thu, 02 Feb 2017 18:36:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7436d28a2 -> c8baf2854


MINOR: Update Streams docs: quickstart and concepts

Added figures for topology and table-stream duality; added sections about aggregations; misc
code fixes.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Sriram Subramanian

Closes #2482 from guozhangwang/KMinor-streams-docs-first-pass


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c8baf285
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c8baf285
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c8baf285

Branch: refs/heads/trunk
Commit: c8baf28545984ef8c88d43899004224c6ef54013
Parents: 7436d28
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Feb 2 10:35:58 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Feb 2 10:35:58 2017 -0800

----------------------------------------------------------------------
 docs/images/streams-concepts-topology.jpg | Bin 0 -> 136983 bytes
 docs/images/streams-table-duality-01.png  | Bin 0 -> 14534 bytes
 docs/images/streams-table-duality-02.png  | Bin 0 -> 56736 bytes
 docs/images/streams-table-duality-03.png  | Bin 0 -> 91331 bytes
 docs/images/streams-table-updates-01.png  | Bin 0 -> 78069 bytes
 docs/images/streams-table-updates-02.png  | Bin 0 -> 91880 bytes
 docs/js/templateData.js                   |   1 +
 docs/quickstart.html                      |  61 ++++++--
 docs/streams.html                         | 189 +++++++++++++++++--------
 9 files changed, 181 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c8baf285/docs/images/streams-concepts-topology.jpg
----------------------------------------------------------------------
diff --git a/docs/images/streams-concepts-topology.jpg b/docs/images/streams-concepts-topology.jpg
new file mode 100644
index 0000000..832f6d4
Binary files /dev/null and b/docs/images/streams-concepts-topology.jpg differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8baf285/docs/images/streams-table-duality-01.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-table-duality-01.png b/docs/images/streams-table-duality-01.png
new file mode 100644
index 0000000..4fa4d1b
Binary files /dev/null and b/docs/images/streams-table-duality-01.png differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8baf285/docs/images/streams-table-duality-02.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-table-duality-02.png b/docs/images/streams-table-duality-02.png
new file mode 100644
index 0000000..4e805c1
Binary files /dev/null and b/docs/images/streams-table-duality-02.png differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8baf285/docs/images/streams-table-duality-03.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-table-duality-03.png b/docs/images/streams-table-duality-03.png
new file mode 100644
index 0000000..b0b04f5
Binary files /dev/null and b/docs/images/streams-table-duality-03.png differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8baf285/docs/images/streams-table-updates-01.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-table-updates-01.png b/docs/images/streams-table-updates-01.png
new file mode 100644
index 0000000..3a2c35e
Binary files /dev/null and b/docs/images/streams-table-updates-01.png differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8baf285/docs/images/streams-table-updates-02.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-table-updates-02.png b/docs/images/streams-table-updates-02.png
new file mode 100644
index 0000000..a0a5b1f
Binary files /dev/null and b/docs/images/streams-table-updates-02.png differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8baf285/docs/js/templateData.js
----------------------------------------------------------------------
diff --git a/docs/js/templateData.js b/docs/js/templateData.js
index 40c5da1..fbb9e4e 100644
--- a/docs/js/templateData.js
+++ b/docs/js/templateData.js
@@ -18,4 +18,5 @@ limitations under the License.
 // Define variables for doc templates
 var context={
     "version": "0101"
+    "dotVersion": "0.10.1"
 };
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8baf285/docs/quickstart.html
----------------------------------------------------------------------
diff --git a/docs/quickstart.html b/docs/quickstart.html
index 763d3e3..2080cc4 100644
--- a/docs/quickstart.html
+++ b/docs/quickstart.html
@@ -279,18 +279,30 @@ data in the topic (or use custom consumer code to process it):
 <p>
 Kafka Streams is a client library of Kafka for real-time stream processing and analyzing
data stored in Kafka brokers.
 This quickstart example will demonstrate how to run a streaming application coded in this
library. Here is the gist
-of the <code>WordCountDemo</code> example code (converted to use Java 8 lambda
expressions for easy reading).
+of the <code><a href="https://github.com/apache/kafka/blob/{dotVersion}/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java">WordCountDemo</a></code>
example code (converted to use Java 8 lambda expressions for easy reading).
 </p>
 <pre>
+// Serializers/deserializers (serde) for String and Long types
+final Serde&lt;String&gt; stringSerde = Serdes.String();
+final Serde&lt;Long&gt; longSerde = Serdes.Long();
+
+// Construct a `KStream` from the input topic ""streams-file-input", where message values
+// represent lines of text (for the sake of this example, we ignore whatever may be stored
+// in the message keys).
+KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, stringSerde,
"streams-file-input");
+
 KTable&lt;String, Long&gt; wordCounts = textLines
     // Split each text line, by whitespace, into words.
     .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
 
-    // Ensure the words are available as record keys for the next aggregate operation.
-    .map((key, value) -> new KeyValue<>(value, value))
+    // Group the text words as message keys
+    .groupBy((key, value) -> value)
 
-    // Count the occurrences of each word (record key) and store the results into a table
named "Counts".
-    .countByKey("Counts")
+    // Count the occurrences of each word (message key).
+    .count("Counts")
+
+// Store the running counts as a changelog stream to the output topic.
+wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");
 </pre>
 
 <p>
@@ -303,7 +315,7 @@ unbounded input data, it will periodically output its current state and
results
 because it cannot know when it has processed "all" the input data.
 </p>
 <p>
-We will now prepare input data to a Kafka topic, which will subsequently be processed by
a Kafka Streams application.
+As the first step, we will prepare input data to a Kafka topic, which will subsequently be
processed by a Kafka Streams application.
 </p>
 
 <!--
@@ -329,7 +341,8 @@ Or on Windows:
 </pre>
 
 <p>
-Next, we send this input data to the input topic named <b>streams-file-input</b>
using the console producer (in practice,
+Next, we send this input data to the input topic named <b>streams-file-input</b>
using the console producer,
+which reads the data from STDIN line-by-line, and publishes each line as a separate Kafka
message with null key and value encoded a string to the topic (in practice,
 stream data will likely be flowing continuously into Kafka where the application will be
up and running):
 </p>
 
@@ -343,7 +356,7 @@ stream data will likely be flowing continuously into Kafka where the application
 
 
 <pre>
-&gt; <b>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input
< file-input.txt</b>
+&gt; <b>cat file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092
--topic streams-file-input</b>
 </pre>
 
 <p>
@@ -355,7 +368,9 @@ We can now run the WordCount demo application to process the input data:
 </pre>
 
 <p>
-There won't be any STDOUT output except log entries as the results are continuously written
back into another topic named <b>streams-wordcount-output</b> in Kafka.
+The demo application will read from the input topic <b>streams-file-input</b>,
perform the computations of the WordCount algorithm on each of the read messages,
+and continuously write its current results to the output topic <b>streams-wordcount-output</b>.
+Hence there won't be any STDOUT output except log entries as the results are written back
into in Kafka.
 The demo will run for a few seconds and then, unlike typical stream processing applications,
terminate automatically.
 </p>
 <p>
@@ -379,9 +394,12 @@ with the following output data being printed to the console:
 
 <pre>
 all     1
+streams 1
 lead    1
 to      1
+kafka   1
 hello   1
+kafka   2
 streams 2
 join    1
 kafka   3
@@ -389,12 +407,35 @@ summit  1
 </pre>
 
 <p>
-Here, the first column is the Kafka message key, and the second column is the message value,
both in <code>java.lang.String</code> format.
+Here, the first column is the Kafka message key in <code>java.lang.String</code>
format, and the second column is the message value in <code>java.lang.Long</code>
format.
 Note that the output is actually a continuous stream of updates, where each data record (i.e.
each line in the original output above) is
 an updated count of a single word, aka record key such as "kafka". For multiple records with
the same key, each later record is an update of the previous one.
 </p>
 
 <p>
+The two diagrams below illustrate what is essentially happening behind the scenes.
+The first column shows the evolution of the current state of the <code>KTable&lt;String,
Long&gt;</code> that is counting word occurrences for <code>count</code>.
+The second column shows the change records that result from state updates to the KTable and
that are being sent to the output Kafka topic <b>streams-wordcount-output</b>.
+</p>
+
+<p>
+First the text line “all streams lead to kafka” is being processed.
+The <code>KTable</code> is being built up as each new word results in a new table
entry (highlighted with a green background), and a corresponding change record is sent to
the downstream <code>KStream</code>.
+</p>
+<img class="centered" src="/{{version}}/images/streams-table-updates-01.png">
+<p>
+When the second text line “hello kafka streams” is processed, we observe, for the first
time, that existing entries in the <code>KTable</code> are being updated (here:
for the words “kafka” and for “streams”). And again, change records are being sent
to the output topic.
+</p>
+<img class="centered" src="/{{version}}/images/streams-table-updates-01.png">
+<p>
+And so on (we skip the illustration of how the third line is being processed). This explains
why the output topic has the contents we showed above, because it contains the full record
of changes.
+</p>
+
+<p>
+Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to
leverage the duality between a table and a changelog stream (here: table = the KTable, changelog
stream = the downstream KStream): you can publish every change of the table to a stream, and
if you consume the entire changelog stream from beginning to end, you can reconstruct the
contents of the table.
+</p>
+
+<p>
 Now you can write more input messages to the <b>streams-file-input</b> topic
and observe additional messages added
 to <b>streams-wordcount-output</b> topic, reflecting updated word counts (e.g.,
using the console producer and the
 console consumer, as described above).

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8baf285/docs/streams.html
----------------------------------------------------------------------
diff --git a/docs/streams.html b/docs/streams.html
index dec17ef..af1b2b0 100644
--- a/docs/streams.html
+++ b/docs/streams.html
@@ -17,7 +17,7 @@
 
 <script><!--#include virtual="js/templateData.js" --></script>
 
-<script id="streams-template" type="text/x-handlebars-template">
+<pre id="streams-template" type="text/x-handlebars-template">
     <h1>Streams</h1>
 
         <ol class="toc">
@@ -25,6 +25,9 @@
                 <a href="#streams_overview">Overview</a>
             </li>
             <li>
+                <a href="#streams_concepts">Overview</a>
+            </li>
+            <li>
                 <a href="#streams_developer">Developer guide</a>
                 <ul>
                     <li><a href="#streams_concepts">Core concepts</a>
@@ -34,7 +37,7 @@
             </li>
         </ol>
 
-        <h2><a id="streams_overview" href="#overview">Overview</a></h2>
+        <h2><a id="streams_overview" href="#streams_overview">Overview</a></h2>
 
         <p>
         Kafka Streams is a client library for processing and analyzing data stored in Kafka
and either write the resulting data back to Kafka or send the final output to an external
system. It builds upon important stream processing concepts such as properly distinguishing
between event time and processing time, windowing support, and simple yet efficient management
of application state.
@@ -47,70 +50,66 @@
         <ul>
             <li>Designed as a <b>simple and lightweight client library</b>,
which can be easily embedded in any Java application and integrated with any existing packaging,
deployment and operational tools that users have for their streaming applications.</li>
             <li>Has <b>no external dependencies on systems other than Apache
Kafka itself</b> as the internal messaging layer; notably, it uses Kafka's partitioning
model to horizontally scale processing while maintaining strong ordering guarantees.</li>
-            <li>Supports <b>fault-tolerant local state</b>, which enables
very fast and efficient stateful operations like joins and windowed aggregations.</li>
-            <li>Employs <b>one-record-at-a-time processing</b> to achieve
low processing latency, and supports <b>event-time based windowing operations</b>.</li>
+            <li>Supports <b>fault-tolerant local state</b>, which enables
very fast and efficient stateful operations like joins and aggregations.</li>
+            <li>Employs <b>one-record-at-a-time processing</b> to achieve
millisecond processing latency, and supports <b>event-time based windowing operations</b>
with late arrival of records.</li>
             <li>Offers necessary stream processing primitives, along with a <b>high-level
Streams DSL</b> and a <b>low-level Processor API</b>.</li>
 
         </ul>
+        <br>
 
-        <h2><a id="streams_developer" href="#streams_developer">Developer Guide</a></h2>
-
-        <p>
-        There is a <a href="#quickstart_kafkastreams">quickstart</a> example
that provides how to run a stream processing program coded in the Kafka Streams library.
-        This section focuses on how to write, configure, and execute a Kafka Streams application.
-        </p>
-
-        <h4><a id="streams_concepts" href="#streams_concepts">Core Concepts</a></h4>
+        <h2><a id="streams_concepts" href="#streams_concepts">Overview</a></h2>
 
         <p>
-        We first summarize the key concepts of Kafka Streams.
+            We first summarize the key concepts of Kafka Streams.
         </p>
 
         <h5><a id="streams_topology" href="#streams_topology">Stream Processing
Topology</a></h5>
 
         <ul>
             <li>A <b>stream</b> is the most important abstraction provided
by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is
an ordered, replayable, and fault-tolerant sequence of immutable data records, where a <b>data
record</b> is defined as a key-value pair.</li>
-            <li>A <b>stream processing application</b> written in Kafka
Streams defines its computational logic through one or more <b>processor topologies</b>,
where a processor topology is a graph of stream processors (nodes) that are connected by streams
(edges).</li>
-            <li>A <b>stream processor</b> is a node in the processor topology;
it represents a processing step to transform data in streams by receiving one input record
at a time from its upstream processors in the topology, applying its operation to it, and
may subsequently producing one or more output records to its downstream processors.</li>
+            <li>A <b>stream processing application</b> is any program that
makes use of the Kafka Streams library. It defines its computational logic through one or
more <b>processor topologies</b>, where a processor topology is a graph of stream
processors (nodes) that are connected by streams (edges).</li>
+            <li>A <b>stream processor</b> is a node in the processor topology;
it represents a processing step to transform data in streams by receiving one input record
at a time from its upstream processors in the topology, applying its operation to it, and
may subsequently produce one or more output records to its downstream processors. </li>
         </ul>
+        <img class="centered" src="/{{version}}/images/streams-concepts-topology.jpg">
+
 
         <p>
-        Kafka Streams offers two ways to define the stream processing topology: the <a
href="#streams_dsl"><b>Kafka Streams DSL</b></a> provides
-        the most common data transformation operations such as <code>map</code>
and <code>filter</code>; the lower-level <a href="#streams_processor"><b>Processor
API</b></a> allows
-        developers define and connect custom processors as well as to interact with <a
href="#streams_state">state stores</a>.
+            Kafka Streams offers two ways to define the stream processing topology: the <a
href="#streams_dsl"><b>Kafka Streams DSL</b></a> provides
+            the most common data transformation operations such as <code>map</code>,
<code>filter</code>, <code>join</code> and <code>aggregations</code>
out of the box; the lower-level <a href="#streams_processor"><b>Processor API</b></a>
allows
+            developers define and connect custom processors as well as to interact with <a
href="#streams_state">state stores</a>.
         </p>
 
         <h5><a id="streams_time" href="#streams_time">Time</a></h5>
 
         <p>
-        A critical aspect in stream processing is the notion of <b>time</b>,
and how it is modeled and integrated.
-        For example, some operations such as <b>windowing</b> are defined based
on time boundaries.
+            A critical aspect in stream processing is the notion of <b>time</b>,
and how it is modeled and integrated.
+            For example, some operations such as <b>windowing</b> are defined
based on time boundaries.
         </p>
         <p>
-        Common notions of time in streams are:
+            Common notions of time in streams are:
         </p>
 
         <ul>
-            <li><b>Event time</b> - The point in time when an event or
data record occurred, i.e. was originally created "at the source".</li>
-            <li><b>Processing time</b> - The point in time when the event
or data record happens to be processed by the stream processing application, i.e. when the
record is being consumed. The processing time may be milliseconds, hours, or days etc. later
than the original event time.</li>
-            <li><b>Ingestion time</b> - The point in time when an event
or data record is stored in a topic partition by a Kafka broker. The difference to event time
is that this ingestion timestamp is generated when the record is appended to the target topic
by the Kafka broker, not when the record is created "at the source". The difference to processing
time is that processing time is when the stream processing application processes the record.
For example, if a record is never processed, there is no notion of processing time for it,
but it still has an ingestion time.
+            <li><b>Event time</b> - The point in time when an event or
data record occurred, i.e. was originally created "at the source". <b>Example:</b>
If the event is a geo-location change reported by a GPS sensor in a car, then the associated
event-time would be the time when the GPS sensor captured the location change.</li>
+            <li><b>Processing time</b> - The point in time when the event
or data record happens to be processed by the stream processing application, i.e. when the
record is being consumed. The processing time may be milliseconds, hours, or days etc. later
than the original event time. <b>Example:</b> Imagine an analytics application
that reads and processes the geo-location data reported from car sensors to present it to
a fleet management dashboard. Here, processing-time in the analytics application might be
milliseconds or seconds (e.g. for real-time pipelines based on Apache Kafka and Kafka Streams)
or hours (e.g. for batch pipelines based on Apache Hadoop or Apache Spark) after event-time.</li>
+            <li><b>Ingestion time</b> - The point in time when an event
or data record is stored in a topic partition by a Kafka broker. The difference to event time
is that this ingestion timestamp is generated when the record is appended to the target topic
by the Kafka broker, not when the record is created "at the source". The difference to processing
time is that processing time is when the stream processing application processes the record.
<b>For example,</b> if a record is never processed, there is no notion of processing
time for it, but it still has an ingestion time.
         </ul>
         <p>
-        The choice between event-time and ingestion-time is actually done through the configuration
of Kafka (not Kafka Streams): From Kafka 0.10.x onwards, timestamps are automatically embedded
into Kafka messages. Depending on Kafka's configuration these timestamps represent event-time
or ingestion-time. The respective Kafka configuration setting can be specified on the broker
level or per topic. The default timestamp extractor in Kafka Streams will retrieve these embedded
timestamps as-is. Hence, the effective time semantics of your application depend on the effective
Kafka configuration for these embedded timestamps.
+            The choice between event-time and ingestion-time is actually done through the
configuration of Kafka (not Kafka Streams): From Kafka 0.10.x onwards, timestamps are automatically
embedded into Kafka messages. Depending on Kafka's configuration these timestamps represent
event-time or ingestion-time. The respective Kafka configuration setting can be specified
on the broker level or per topic. The default timestamp extractor in Kafka Streams will retrieve
these embedded timestamps as-is. Hence, the effective time semantics of your application depend
on the effective Kafka configuration for these embedded timestamps.
         </p>
         <p>
-        Kafka Streams assigns a <b>timestamp</b> to every data record
-        via the <code>TimestampExtractor</code> interface.
-        Concrete implementations of this interface may retrieve or compute timestamps based
on the actual contents of data records such as an embedded timestamp field
-        to provide event-time semantics, or use any other approach such as returning the
current wall-clock time at the time of processing,
-        thereby yielding processing-time semantics to stream processing applications.
-        Developers can thus enforce different notions of time depending on their business
needs. For example,
-        per-record timestamps describe the progress of a stream with regards to time (although
records may be out-of-order within the stream) and
-        are leveraged by time-dependent operations such as joins.
+            Kafka Streams assigns a <b>timestamp</b> to every data record
+            via the <code>TimestampExtractor</code> interface.
+            Concrete implementations of this interface may retrieve or compute timestamps
based on the actual contents of data records such as an embedded timestamp field
+            to provide event-time semantics, or use any other approach such as returning
the current wall-clock time at the time of processing,
+            thereby yielding processing-time semantics to stream processing applications.
+            Developers can thus enforce different notions of time depending on their business
needs. For example,
+            per-record timestamps describe the progress of a stream with regards to time
(although records may be out-of-order within the stream) and
+            are leveraged by time-dependent operations such as joins.
         </p>
 
         <p>
-        Finally, whenever a Kafka Streams application writes records to Kafka, then it will
also assign timestamps to these new records. The way the timestamps are assigned depends on
the context:
+            Finally, whenever a Kafka Streams application writes records to Kafka, then it
will also assign timestamps to these new records. The way the timestamps are assigned depends
on the context:
         <ul>
             <li> When new output records are generated via processing some input record,
for example, <code>context.forward()</code> triggered in the <code>process()</code>
function call, output record timestamps are inherited from input record timestamps directly.</li>
             <li> When new output records are generated via periodic functions such
as <code>punctuate()</code>, the output record timestamp is defined as the current
internal time (obtained through <code>context.timestamp()</code>) of the stream
task.</li>
@@ -121,22 +120,31 @@
         <h5><a id="streams_state" href="#streams_state">States</a></h5>
 
         <p>
-        Some stream processing applications don't require state, which means the processing
of a message is independent from
-        the processing of all other messages.
-        However, being able to maintain state opens up many possibilities for sophisticated
stream processing applications: you
-        can join input streams, or group and aggregate data records. Many such stateful operators
are provided by the <a href="#streams_dsl"><b>Kafka Streams DSL</b></a>.
+            Some stream processing applications don't require state, which means the processing
of a message is independent from
+            the processing of all other messages.
+            However, being able to maintain state opens up many possibilities for sophisticated
stream processing applications: you
+            can join input streams, or group and aggregate data records. Many such stateful
operators are provided by the <a href="#streams_dsl"><b>Kafka Streams DSL</b></a>.
         </p>
         <p>
-        Kafka Streams provides so-called <b>state stores</b>, which can be used
by stream processing applications to store and query data.
-        This is an important capability when implementing stateful operations.
-        Every task in Kafka Streams embeds one or more state stores that can be accessed
via APIs to store and query data required for processing.
-        These state stores can either be a persistent key-value store, an in-memory hashmap,
or another convenient data structure.
-        Kafka Streams offers fault-tolerance and automatic recovery for local state stores.
+            Kafka Streams provides so-called <b>state stores</b>, which can be
used by stream processing applications to store and query data.
+            This is an important capability when implementing stateful operations.
+            Every task in Kafka Streams embeds one or more state stores that can be accessed
via APIs to store and query data required for processing.
+            These state stores can either be a persistent key-value store, an in-memory hashmap,
or another convenient data structure.
+            Kafka Streams offers fault-tolerance and automatic recovery for local state stores.
         </p>
         <p>
-        Kafka Streams allows direct read-only queries of the state stores by methods, threads,
processes or applications external to the stream processing application that created the state
stores. This is provided through a feature called <b>Interactive Queries</b>.
All stores are named and Interactive Queries exposes only the read operations of the underlying
implementation. 
+            Kafka Streams allows direct read-only queries of the state stores by methods,
threads, processes or applications external to the stream processing application that created
the state stores. This is provided through a feature called <b>Interactive Queries</b>.
All stores are named and Interactive Queries exposes only the read operations of the underlying
implementation.
         </p>
         <br>
+
+        <h2><a id="streams_developer" href="#streams_developer">Developer Guide</a></h2>
+
+        <p>
+        There is a <a href="#quickstart_kafkastreams">quickstart</a> example
that provides how to run a stream processing program coded in the Kafka Streams library.
+        This section focuses on how to write, configure, and execute a Kafka Streams application.
+        </p>
+
+
         <p>
         As we have mentioned above, the computational logic of a Kafka Streams application
is defined as a <a href="#streams_topology">processor topology</a>.
         Currently Kafka Streams provides two sets of APIs to define the processor topology,
which will be described in the subsequent sections.
@@ -157,16 +165,16 @@
         </p>
 
         <pre>
-            public class MyProcessor extends Processor<String, String> {
+            public class MyProcessor extends Processor&lt;String, String&gt; {
                 private ProcessorContext context;
-                private KeyValueStore<String, Integer> kvStore;
+                private KeyValueStore&lt;String, Integer&gt; kvStore;
 
                 @Override
                 @SuppressWarnings("unchecked")
                 public void init(ProcessorContext context) {
                     this.context = context;
                     this.context.schedule(1000);
-                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+                    this.kvStore = (KeyValueStore&lt;String, Integer&gt;) context.getStateStore("Counts");
                 }
 
                 @Override
@@ -186,10 +194,10 @@
 
                 @Override
                 public void punctuate(long timestamp) {
-                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
+                    KeyValueIterator&lt;String, Integer&gt; iter = this.kvStore.all();
 
                     while (iter.hasNext()) {
-                        KeyValue<String, Integer> entry = iter.next();
+                        KeyValue&lt;String, Integer&gt; entry = iter.next();
                         context.forward(entry.key, entry.value.toString());
                     }
 
@@ -280,12 +288,52 @@
         To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code>
class, which is extended from the <code>TopologyBuilder</code>.
         A simple example is included with the source code for Kafka in the <code>streams/examples</code>
package. The rest of this section will walk
         through some code to demonstrate the key steps in creating a topology using the Streams
DSL, but we recommend developers to read the full example source
-        codes for details. 
+        codes for details.
+
+        <h5><a id="streams_duality" href="#streams_duality">Duality of Streams
and Tables</a></h5>
+
+        <p>
+        Before we discuss concepts such as aggregations in Kafka Streams we must first introduce
tables, and most importantly the relationship between tables and streams:
+        the so-called <a href="https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying">stream-table
duality</a>.
+        Essentially, this duality means that a stream can be viewed as a table, and vice
versa. Kafka’s log compaction feature, for example, exploits this duality.
+        </p>
+
+        <p>
+        A simple form of a table is a collection of key-value pairs, also called a map or
associative array. Such a table may look as follows:
+        </p>
+        <img class="centered" src="/{{version}}/images/streams-table-duality-01.png">
+
+        The <b>stream-table duality</b> describes the close relationship between
streams and tables.
+        <ul>
+        <li><b>Stream as Table</b>: A stream can be considered a changelog
of a table, where each data record in the stream captures a state change of the table. A stream
is thus a table in disguise, and it can be easily turned into a “real” table by replaying
the changelog from beginning to end to reconstruct the table. Similarly, in a more general
analogy, aggregating data records in a stream – such as computing the total number of pageviews
by user from a stream of pageview events – will return a table (here with the key and the
value being the user and its corresponding pageview count, respectively).</li>
+        <li><b>Table as Stream</b>: A table can be considered a snapshot,
at a point in time, of the latest value for each key in a stream (a stream’s data records
are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into
a “real” stream by iterating over each key-value entry in the table.</li>
+        </ul>
+
+        <p>
+        Let’s illustrate this with an example. Imagine a table that tracks the total number
of pageviews by user (first column of diagram below). Over time, whenever a new pageview event
is processed, the state of the table is updated accordingly. Here, the state changes between
different points in time – and different revisions of the table – can be represented as
a changelog stream (second column).
+        </p>
+        <img class="centered" src="/{{version}}/images/streams-table-duality-02.png">
+
+        <p>
+        Interestingly, because of the stream-table duality, the same stream can be used to
reconstruct the original table (third column):
+        </p>
+        <img class="centered" src="/{{version}}/images/streams-table-duality-03.png">
+
+        <p>
+        The same mechanism is used, for example, to replicate databases via change data capture
(CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for
fault-tolerance.
+        The stream-table duality is such an important concept that Kafka Streams models it
explicitly via the <a href="#streams_kstream_ktable">KStream and KTable</a> interfaces,
which we describe in the next sections.
+        </p>
 
         <h5><a id="streams_kstream_ktable" href="#streams_kstream_ktable">KStream
and KTable</a></h5>
-        The DSL uses two main abstractions. A <b>KStream</b> is an abstraction
of a record stream, where each data record represents a self-contained datum in the unbounded
data set. A <b>KTable</b> is an abstraction of a changelog stream, where each
data record represents an update. More precisely, the value in a data record is considered
to be an update of the last value for the same record key, if any (if a corresponding key
doesn't exist yet, the update will be considered a create). To illustrate the difference between
KStreams and KTables, let’s imagine the following two data records are being sent to the
stream: <code>("alice", 1) --> ("alice", 3)</code>. If these records a KStream
and the stream processing application were to sum the values it would return <code>4</code>.
If these records were a KTable, the return would be <code>3</code>, since the
last record would be considered as an update.
+        The DSL uses two main abstractions. A <b>KStream</b> is an abstraction
of a record stream, where each data record represents a self-contained datum in the unbounded
data set.
+        A <b>KTable</b> is an abstraction of a changelog stream, where each data
record represents an update. More precisely, the value in a data record is considered to be
an update of the last value for the same record key,
+        if any (if a corresponding key doesn't exist yet, the update will be considered a
create). To illustrate the difference between KStreams and KTables, let’s imagine the following
two data records are being sent to the stream:
 
+        <pre>
+            ("alice", 1) --> ("alice", 3)
+        </pre>
 
+        If these records a KStream and the stream processing application were to sum the
values it would return <code>4</code>. If these records were a KTable, the return
would be <code>3</code>, since the last record would be considered as an update.
 
         <h5><a id="streams_dsl_source" href="#streams_dsl_source">Create Source
Streams from Kafka</a></h5>
 
@@ -298,8 +346,8 @@
         <pre>
             KStreamBuilder builder = new KStreamBuilder();
 
-            KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
-            KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName");
+            KStream&lt;String, GenericRecord&gt; source1 = builder.stream("topic1",
"topic2");
+            KTable&lt;String, GenericRecord&gt; source2 = builder.table("topic3",
"stateStoreName");
         </pre>
 
         <h5><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing
a stream</a></h5>
@@ -310,7 +358,18 @@
         <li><b>Sliding windows</b> model a fixed-size window that slides
continuously over the time axis; here, two data records are said to be included in the same
window if the difference of their timestamps is within the window size. Thus, sliding windows
are not aligned to the epoch, but on the data record timestamps. In Kafka Streams, sliding
windows are used only for join operations, and can be specified through the <code>JoinWindows</code>
class.</li>
         </ul>
 
-        <h5><a id="streams_dsl_joins" href="#streams_dsl_joins">Joins</a></h5>
+        <p>
+        In the Kafka Streams DSL users can specify a <b>retention period</b>
for the window. This allows Kafka Streams to retain old window buckets for a period of time
in order to wait for the late arrival of records whose timestamps fall within the window interval.
+        If a record arrives after the retention period has passed, the record cannot be processed
and is dropped.
+        </p>
+
+        <p>
+        Late-arriving records are always possible in real-time data streams. However, it
depends on the effective <a href="#streams_team">time semantics</a> how late records
are handled. Using processing-time, the semantics are “when the data is being processed”,
+        which means that the notion of late records is not applicable as, by definition,
no record can be late. Hence, late-arriving records only really can be considered as such
(i.e. as arriving “late”) for event-time or ingestion-time semantics. In both cases,
+        Kafka Streams is able to properly handle late-arriving records.
+        </p>
+
+        <h5><a id="streams_dsl_joins" href="#streams_dsl_joins">Join multiple
streams</a></h5>
         A <b>join</b> operation merges two streams based on the keys of their
data records, and yields a new stream. A join over record streams usually needs to be performed
on a windowing basis because otherwise the number of records that must be maintained for performing
the join may grow indefinitely. In Kafka Streams, you may perform the following join operations:
         <ul>
         <li><b>KStream-to-KStream Joins</b> are always windowed joins,
since otherwise the memory and state required to compute the join would grow infinitely in
size. Here, a newly received record from one of the streams is joined with the other stream's
records within the specified window interval to produce one result for each matching pair
based on user-provided <code>ValueJoiner</code>. A new <code>KStream</code>
instance representing the result stream of the join is returned from this operator.</li>
@@ -320,11 +379,20 @@
         </ul>
 
         Depending on the operands the following join operations are supported: <b>inner
joins</b>, <b>outer joins</b> and <b>left joins</b>. Their semantics
are similar to the corresponding operators in relational databases.
-        a
+
+        <h5><a id="streams_dsl_aggregations" href="#streams_dsl_aggregations">Aggregate
a stream</a></h5>
+        An <b>aggregation</b> operation takes one input stream, and yields a
new stream by combining multiple input records into a single output record. Examples of aggregations
are computing counts or sum. An aggregation over record streams usually needs to be performed
on a windowing basis because otherwise the number of records that must be maintained for performing
the aggregation may grow indefinitely.
+
+        <p>
+        In the Kafka Streams DSL, an input stream of an aggregation can be a <code>KStream</code>
or a <code>KTable</code>, but the output stream will always be a <code>KTable</code>.
+        This allows Kafka Streams to update an aggregate value upon the late arrival of further
records after the value was produced and emitted.
+        When such late arrival happens, the aggregating <code>KStream</code>
or <code>KTable</code> simply emits a new aggregate value. Because the output
is a <code>KTable</code>, the new value is considered to overwrite the old value
with the same key in subsequent processing steps.
+        </p>
+
         <h5><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform
a stream</a></h5>
 
         <p>
-        There is a list of transformation operations provided for <code>KStream</code>
and <code>KTable</code> respectively.
+        Besides join and aggregation operations, there is a list of other transformation
operations provided for <code>KStream</code> and <code>KTable</code>
respectively.
         Each of these operations may generate either one or more <code>KStream</code>
and <code>KTable</code> objects and
         can be translated into one or more connected processors into the underlying processor
topology.
         All these transformation methods can be chained together to compose a complex processor
topology.
@@ -342,7 +410,7 @@
 
         <pre>
             // written in Java 8+, using lambda expressions
-            KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category"));
+            KStream&lt;String, GenericRecord&gt; mapped = source1.mapValue(record
-> record.get("category"));
         </pre>
 
         <p>
@@ -356,14 +424,14 @@
 
         <pre>
             // written in Java 8+, using lambda expressions
-            KTable<Windowed<String>, Long> counts = source1.groupByKey().aggregate(
+            KTable&lt;Windowed&lt;String&gt;, Long&gt; counts = source1.groupByKey().aggregate(
                 () -> 0L,  // initial value
                 (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
                 TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
                 Serdes.Long() // serde for aggregated value
             );
 
-            KStream<String, String> joined = source1.leftJoin(source2,
+            KStream&lt;String, String&gt; joined = source1.leftJoin(source2,
                 (record1, record2) -> record1.get("user") + "-" + record2.get("region");
             );
         </pre>
@@ -388,7 +456,7 @@
             //
             // joined.to("topic4");
             // materialized = builder.stream("topic4");
-            KStream<String, String> materialized = joined.through("topic4");
+            KStream&lt;String, String&gt; materialized = joined.through("topic4");
         </pre>
 
 
@@ -398,6 +466,7 @@
         in <code>StreamsConfig</code> before running it. A complete list of
         Kafka Streams configs can be found <a href="#streamsconfigs"><b>here</b></a>.
         </p>
+</pre>
 </script>
 
 <!--#include virtual="../includes/_header.htm" -->


Mime
View raw message