kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3454: add Kafka Streams web docs
Date Fri, 25 Mar 2016 23:05:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 1b1b949b7 -> 496bd3fd4


KAFKA-3454: add Kafka Streams web docs

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Gwen Shapira

Closes #1127 from guozhangwang/KStreamsDocs

(cherry picked from commit 23b50093f4100ce7fbff325cdc92ee6cf3c54102)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/496bd3fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/496bd3fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/496bd3fd

Branch: refs/heads/0.10.0
Commit: 496bd3fd41c0272b7ef77652b85cbaae748e100e
Parents: 1b1b949
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Mar 25 16:04:58 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Mar 25 16:05:07 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    | 123 +++----
 docs/configuration.html                         |   5 +
 docs/documentation.html                         |  15 +
 docs/quickstart.html                            | 109 ++++++
 docs/streams.html                               | 341 +++++++++++++++++++
 .../examples/pageview/PageViewTypedDemo.java    |   3 +-
 .../examples/pageview/PageViewUntypedDemo.java  |   3 +-
 .../kafka/streams/examples/pipe/PipeDemo.java   |   3 +-
 .../examples/wordcount/WordCountDemo.java       |   3 +-
 .../wordcount/WordCountProcessorDemo.java       |   3 +-
 .../org/apache/kafka/streams/StreamsConfig.java |  12 +-
 11 files changed, 546 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index c29ad5a..13a8b4e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -413,7 +413,7 @@ project(':core') {
 
   task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
                                'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
-                               ':connect:runtime:genConnectConfigDocs'], type: Tar) {
+                               ':connect:runtime:genConnectConfigDocs', ':streams:genStreamsConfigDocs'],
type: Tar) {
     classifier = 'site-docs'
     compression = Compression.GZIP
     from project.file("../docs")
@@ -552,77 +552,84 @@ project(':clients') {
 }
 
 project(':tools') {
-    archivesBaseName = "kafka-tools"
+  archivesBaseName = "kafka-tools"
 
-    dependencies {
-        compile project(':clients')
-        compile project(':log4j-appender')
-        compile libs.argparse4j
-        compile libs.jacksonDatabind
-        compile libs.slf4jlog4j
+  dependencies {
+    compile project(':clients')
+    compile project(':log4j-appender')
+    compile libs.argparse4j
+    compile libs.jacksonDatabind
+    compile libs.slf4jlog4j
 
-        testCompile project(':clients')
-        testCompile libs.junit
-    }
+    testCompile project(':clients')
+    testCompile libs.junit
+  }
 
-    javadoc {
-        include "**/org/apache/kafka/tools/*"
-    }
+  javadoc {
+    include "**/org/apache/kafka/tools/*"
+  }
 
-    tasks.create(name: "copyDependantLibs", type: Copy) {
-        from (configurations.testRuntime) {
-            include('slf4j-log4j12*')
-        }
-        from (configurations.runtime) {
-            exclude('kafka-clients*')
-        }
-        into "$buildDir/dependant-libs-${versions.scala}"
-        duplicatesStrategy 'exclude'
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
     }
-
-    jar {
-        dependsOn 'copyDependantLibs'
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
     }
+    into "$buildDir/dependant-libs-${versions.scala}"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn 'copyDependantLibs'
+  }
 }
 
 project(':streams') {
-    archivesBaseName = "kafka-streams"
-
-    dependencies {
-        compile project(':clients')
-        compile project(':connect:json')  // this dependency should be removed after we unify
data API
-        compile libs.slf4jlog4j
-        compile libs.rocksDBJni
-        compile libs.zkclient // this dependency should be removed after KIP-4
-        compile libs.jacksonDatabind // this dependency should be removed after KIP-4
-
-        testCompile project(':clients').sourceSets.test.output
-        testCompile libs.junit
-    }
+  archivesBaseName = "kafka-streams"
 
-    javadoc {
-        include "**/org/apache/kafka/streams/**"
-        exclude "**/internals/**"
-    }
+  dependencies {
+    compile project(':clients')
+    compile project(':connect:json')  // this dependency should be removed after we unify
data API
+    compile libs.slf4jlog4j
+    compile libs.rocksDBJni
+    compile libs.zkclient // this dependency should be removed after KIP-4
+    compile libs.jacksonDatabind // this dependency should be removed after KIP-4
 
-    tasks.create(name: "copyDependantLibs", type: Copy) {
-        from (configurations.testRuntime) {
-            include('slf4j-log4j12*')
-        }
-        from (configurations.runtime) {
-            exclude('kafka-clients*')
-        }
-        into "$buildDir/dependant-libs-${versions.scala}"
-        duplicatesStrategy 'exclude'
-    }
+    testCompile project(':clients').sourceSets.test.output
+    testCompile libs.junit
+  }
 
-    jar {
-        dependsOn 'copyDependantLibs'
-    }
+  javadoc {
+    include "**/org/apache/kafka/streams/**"
+    exclude "**/internals/**"
+  }
 
-    systemTestLibs {
-        dependsOn testJar
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
     }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+    }
+    into "$buildDir/dependant-libs-${versions.scala}"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn 'copyDependantLibs'
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+
+  task genStreamsConfigDocs(type: JavaExec) {
+    classpath = sourceSets.main.runtimeClasspath
+    main = 'org.apache.kafka.streams.StreamsConfig'
+    if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
+    standardOutput = new File(generatedDocsDir, "streams_config.html").newOutputStream()
+  }
 }
 
 project(':streams:examples') {

http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/docs/configuration.html
----------------------------------------------------------------------
diff --git a/docs/configuration.html b/docs/configuration.html
index a89778d..e5280a5 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -333,4 +333,9 @@ Since 0.9.0.0 we have been working on a replacement for our existing simple
and
 <!--#include virtual="generated/consumer_config.html" -->
 
 <h3><a id="connectconfigs" href="#connectconfigs">3.4 Kafka Connect Configs</a></h3>
+Below is the configuration of the Kafka Connect framework.
 <!--#include virtual="generated/connect_config.html" -->
+
+<h3><a id="streamsconfigs" href="#streamsconfigs">3.5 Kafka Streams Configs</a></h3>
+Below is the configuration of the Kafka Streams client library.
+<!--#include virtual="generated/streams_config.html" -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/docs/documentation.html
----------------------------------------------------------------------
diff --git a/docs/documentation.html b/docs/documentation.html
index 67a2954..70002ab 100644
--- a/docs/documentation.html
+++ b/docs/documentation.html
@@ -52,6 +52,7 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>,
<a href="/08/documen
                     <li><a href="#newconsumerconfigs">3.3.2 New Consumer Configs</a>
                 </ul>
             <li><a href="#connectconfigs">3.4 Kafka Connect Configs</a>
+            <li><a href="#streamsconfigs">3.5 Kafka Streams Configs</a>
         </ul>
     </li>
     <li><a href="#design">4. Design</a>
@@ -136,6 +137,17 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>,
<a href="/08/documen
             <li><a href="#connect_development">8.3 Connector Development Guide</a></li>
         </ul>
     </li>
+    <li><a href="#streams">9. Kafka Streams</a>
+        <ul>
+            <li><a href="#streams_overview">9.1 Overview</a></li>
+            <li><a href="#streams_developer">9.2 Developer Guide</a></li>
+            <ul>
+                <li><a href="#streams_concepts">Core Concepts</a></li>
+                <li><a href="#streams_processor">Low-Level Processor API</a></li>
+                <li><a href="#streams_dsl">High-Level Streams DSL</a></li>
+            </ul>
+        </ul>
+    </li>
 </ul>
 
 <h2><a id="gettingStarted" href="#gettingStarted">1. Getting Started</a></h2>
@@ -171,4 +183,7 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>,
<a href="/08/documen
 <h2><a id="connect" href="#connect">8. Kafka Connect</a></h2>
 <!--#include virtual="connect.html" -->
 
+<h2><a id="streams" href="#streams">9. Kafka Streams</a></h2>
+<!--#include virtual="streams.html" -->
+
 <!--#include virtual="../includes/footer.html" -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/docs/quickstart.html
----------------------------------------------------------------------
diff --git a/docs/quickstart.html b/docs/quickstart.html
index 1e7b62c..7a923c6 100644
--- a/docs/quickstart.html
+++ b/docs/quickstart.html
@@ -249,3 +249,112 @@ The connectors continue to process data, so we can add data to the file
and see
 </pre>
 
 You should see the line appear in the console consumer output and in the sink file.
+
+<h4><a id="quickstart_kafkastreams" href="#quickstart_kafkastreams">Step 8: Use
Kafka Streams to process data</a></h4>
+
+<p>
+Kafka Streams is a client library of Kafka for real-time stream processing and analyzing
data stored in Kafka brokers.
+This quickstart example will demonstrate how to run a streaming application coded in this
library. Here is the gist
+of the <code>WordCountDemo</code> example code (converted to use Java 8 lambda
expressions for easy reading).
+</p>
+<pre>
+KStream<String, Long> wordCounts = textLines
+// Split each text line, by whitespace, into words.
+.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
+// Ensure the words are available as message keys for the next aggregate operation.
+.map((key, value) -> new KeyValue<>(value, value))
+// Count the occurrences of each word (message key).
+.countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts")
+// Convert the resulted aggregate table into another stream.
+.toStream();
+</pre>
+
+<p>
+It implements the WordCount
+algorithm, which computes a word occurrence histogram from the input text. However, unlike
other WordCount examples
+you might have seen before that operate on bounded data, the WordCount demo application behaves
slightly differently because it is
+designed to operate on an <b>infinite, unbounded stream</b> of data. Similar
to the bounded variant, it is a stateful algorithm that
+tracks and updates the counts of words. However, since it must assume potentially
+unbounded input data, it will periodically output its current state and results while continuing
to process more data
+because it cannot know when it has processed "all" the input data.
+</p>
+<p>
+We will now prepare input data to a Kafka topic, which will subsequently processed by a Kafka
Streams application.
+</p>
+
+<!--
+<pre>
+&gt; <b>./bin/kafka-topics --create \</b>
+            <b>--zookeeper localhost:2181 \</b>
+            <b>--replication-factor 1 \</b>
+            <b>--partitions 1 \</b>
+            <b>--topic streams-file-input</b>
+
+</pre>
+
+-->
+
+<pre>
+&gt; <b>echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit"
> file-input.txt</b>
+</pre>
+
+<p>
+Next, we send this input data to the input topic named <b>streams-file-input</b>
using the console producer (in practice,
+stream data will likely be flowing continuously into Kafka where the application will be
up and running):
+</p>
+
+<pre>
+&gt; <b>cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092
--topic streams-file-input</b>
+</pre>
+
+<p>
+We can now run the WordCount demo application to process the input data:
+</p>
+
+<pre>
+&gt; <b>./bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo</b>
+</pre>
+
+<p>
+There won't be any STDOUT output except log entries as the results are continuously written
back into another topic named <b>streams-wordcount-output</b> in Kafka.
+The demo will run for a few seconds and then, unlike typical stream processing applications,
terminate automatically.
+</p>
+<p>
+We can now inspect the output of the WordCount demo application by reading from its output
topic:
+</p>
+
+<pre>
+&gt; <b>./bin/kafka-console-consumer --zookeeper localhost:2181 \</b>
+            <b>--topic streams-wordcount-output \</b>
+            <b>--from-beginning \</b>
+            <b>--formatter kafka.tools.DefaultMessageFormatter \</b>
+            <b>--property print.key=true \</b>
+            <b>--property print.key=true \</b>
+            <b>--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
\</b>
+            <b>--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer</b>
+</pre>
+
+<p>
+with the following output data being printed to the console (You can stop the console consumer
via <b>Ctrl-C</b>):
+</p>
+
+<pre>
+all     1
+streams 1
+lead    1
+to      1
+kafka   1
+hello   1
+kafka   2
+streams 2
+join    1
+kafka   3
+summit  1
+<b>^C</b>
+</pre>
+
+<p>
+Here, the first column is the Kafka message key, and the second column is the message value,
both in in <code>java.lang.String</code> format.
+Note that the output is actually a continuous stream of updates, where each data record (i.e.
each line in the original output above) is
+an updated count of a single word, aka record key such as "kafka". For multiple records with
the same key, each later record is an update of the previous one.
+</p>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/docs/streams.html
----------------------------------------------------------------------
diff --git a/docs/streams.html b/docs/streams.html
new file mode 100644
index 0000000..9b94bb3
--- /dev/null
+++ b/docs/streams.html
@@ -0,0 +1,341 @@
+<!--~
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~-->
+
+<h3><a id="streams_overview" href="#streams_overview">9.1 Overview</a></h3>
+
+<p>
+Kafka Streams is a client library for processing and analyzing data stored in Kafka and either
write the resulting data back to Kafka or send the final output to an external system. It
builds upon important stream processing concepts such as properly distinguishing between event
time and processing time, windowing support, and simple yet efficient management of application
state.
+Kafka Streams has a <b>low barrier to entry</b>: You can quickly write and run
a small-scale proof-of-concept on a single machine; and you only need to run additional instances
of your application on multiple machines to scale up to high-volume production workloads.
Kafka Streams transparently handles the load balancing of multiple instances of the same application
by leveraging Kafka's parallelism model.
+</p>
+<p>
+Some highlights of Kafka Streams:
+</p>
+
+<ul>
+    <li>Designed as a <b>simple and lightweight client library</b>, which
can be easily embedded in any Java application and integrated with any existing packaging,
deployment and operational tools that users have for their streaming applications.</li>
+    <li>Has <b>no external dependencies on systems other than Apache Kafka itself</b>
as the internal messaging layer; notably, it uses Kafka's partitioning model to horizontally
scale processing while maintaining strong ordering guarantees.</li>
+    <li>Supports <b>fault-tolerant local state</b>, which enables very
fast and efficient stateful operations like joins and windowed aggregations.</li>
+    <li>Employs <b>one-record-at-a-time processing</b> to achieve low processing
latency, and supports <b>event-time based windowing operations</b>.</li>
+    <li>Offers necessary stream processing primitives, along with a <b>high-level
Streams DSL</b> and a <b>low-level Processor API</b>.</li>
+
+</ul>
+
+<h3><a id="streams_developer" href="#streams_developer">9.2 Developer Guide</a></h3>
+
+<p>
+There is a <a href="#quickstart_kafkastreams">quickstart</a> example that provides
how to run a stream processing program coded in the Kafka Streams library.
+This section focuses on how to write, configure, and execute a Kafka Streams application.
+</p>
+
+<h4><a id="streams_concepts" href="#streams_concepts">Core Concepts</a></h4>
+
+<p>
+We first summarize the key concepts of Kafka Streams.
+</p>
+
+<h5><a id="streams_topology" href="#streams_topology">Stream Processing Topology</a></h5>
+
+<ul>
+    <li>A <b>stream</b>is the most important abstraction provided by Kafka
Streams: it represents an unbounded, continuously updating data set. A stream is an ordered,
replayable, and fault-tolerant sequence of immutable data records, where a <b>data record</b>
is defined as a key-value pair.</li>
+    <li>A stream processing application written in Kafka Streams defines its computational
logic through one or more <b>processor topologies</b>, where a processor topology
is a graph of stream processors (nodes) that are connected by streams (edges).</li>
+    <li>A <b>stream processor</b> is a node in the processor topology;
it represents a processing step to transform data in streams by receiving one input record
at a time from its upstream processors in the topology, applying its operation to it, and
may subsequently producing one or more output records to its downstream processors.</li>
+</ul>
+
+<p>
+Kafka Streams offers two ways to define the stream processing topology: the <a href="#streams_dsl"><b>Kafka
Streams DSL</b></a> provides
+the most common data transformation operations such as <code>map</code> and <code>filter</code>;
the lower-level <a href="#streams_processor"><b>Processor API</b></a>
allows
+developers define and connect custom processors as well as to interact with <a href="#streams_state">state
stores</a>.
+</p>
+
+<h5><a id="streams_time" href="#streams_time">Time</a></h5>
+
+<p>
+A critical aspect in stream processing is the the notion of <b>time</b>, and
how it is modeled and integrated.
+For example, some operations such as <b>windowing</b> are defined based on time
boundaries.
+</p>
+<p>
+Common notions of time in streams are:
+</p>
+
+<ul>
+    <li><b>Event time</b> - The point in time when an event or data record
occurred, i.e. was originally created "at the source".</li>
+    <li><b>Processing time</b> - The point in time when the event or data
record happens to be processed by the stream processing application, i.e. when the record
is being consumed. The processing time may be milliseconds, hours, or days etc. later than
the original event time.</li>
+</ul>
+
+<p>
+Kafka Streams assigns a <b>timestamp</b> to every data record
+via the <code>TimestampExtractor</code> interface.
+Concrete implementations of this interface may retrieve or compute timestamps based on the
actual contents of data records such as an embedded timestamp field
+to provide event-time semantics, or use any other approach such as returning the current
wall-clock time at the time of processing,
+thereby yielding processing-time semantics to stream processing applications.
+Developers can thus enforce different notions of time depending on their business needs.
For example,
+per-record timestamps describe the progress of a stream with regards to time (although records
may be out-of-order within the stream) and
+are leveraged by time-dependent operations such as joins.
+</p>
+
+<h5><a id="streams_state" href="#streams_state">States</a></h5>
+
+<p>
+Some stream processing applications don't require state, which means the processing of a
message is independent from
+the processing of all other messages.
+However, being able to maintain state opens up many possibilities for sophisticated stream
processing applications: you
+can join input streams, or group and aggregate data records. Many such stateful operators
are provided by the <a href="#streams_dsl"><b>Kafka Streams DSL</b></a>.
+</p>
+<p>
+Kafka Streams provides so-called <b>state stores</b>, which can be used by stream
processing applications to store and query data.
+This is an important capability when implementing stateful operations.
+Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs
to store and query data required for processing.
+These state stores can either be a persistent key-value store, an in-memory hashmap, or another
convenient data structure.
+Kafka Streams offers fault-tolerance and automatic recovery for local state stores.
+</p>
+<br>
+<p>
+As we have mentioned above, the computational logic of a Kafka Streams application is defined
as a <a href="#streams_topology">processor topology</a>.
+Currently Kafka Streams provides two sets of APIs to define the processor topology, which
will be described in the subsequent sections.
+</p>
+
+<h4><a id="streams_processor" href="#streams_processor">Low-Level Processor API</a></h4>
+
+<h5><a id="streams_processor_process" href="#streams_processor_process">Processor</a></h5>
+
+<p>
+Developers can define their customized processing logic by implementing the <code>Processor</code>
interface, which
+provides <code>process</code> and <code>punctuate</code> methods.
The <code>process</code> method is performed on each
+of the received record; and the <code>punctuate</code> method is performed periodically
based on elapsed time.
+In addition, the processor can maintain the current <code>ProcessorContext</code>
instance variable initialized in the
+<code>init</code> method, and use the context to schedule the punctuation period
(<code>context().schedule</code>), to
+forward the modified / new key-value pair to downstream processors (<code>context().forward</code>),
to commit the current
+processing progress (<code>context().commit</code>), etc.
+</p>
+
+<pre>
+    public class MyProcessor extends Processor<String, String> {
+        private ProcessorContext context;
+        private KeyValueStore<String, Integer> kvStore;
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public void init(ProcessorContext context) {
+            this.context = context;
+            this.context.schedule(1000);
+            this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+        }
+
+        @Override
+        public void process(String dummy, String line) {
+            String[] words = line.toLowerCase().split(" ");
+
+            for (String word : words) {
+                Integer oldValue = this.kvStore.get(word);
+
+                if (oldValue == null) {
+                    this.kvStore.put(word, 1);
+                } else {
+                    this.kvStore.put(word, oldValue + 1);
+                }
+            }
+        }
+
+        @Override
+        public void punctuate(long timestamp) {
+            KeyValueIterator<String, Integer> iter = this.kvStore.all();
+
+            while (iter.hasNext()) {
+                KeyValue<String, Integer> entry = iter.next();
+                context.forward(entry.key, entry.value.toString());
+            }
+
+            iter.close();
+            context.commit();
+        }
+
+        @Override
+        public void close() {
+            this.kvStore.close();
+        }
+    };
+</pre>
+
+<p>
+In the above implementation, the following actions are performed:
+
+<ul>
+    <li>In the <code>init</code> method, schedule the punctuation every
1 second and retrieve the local state store by its name "Counts".</li>
+    <li>In the <code>process</code> method, upon each received record,
split the value string into words, and update their counts into the state store (we will talk
about this feature later in the section).</li>
+    <li>In the <code>punctuate</code> method, iterate the local state store
and send the aggregated counts to the downstream processor, and commit the current stream
state.</li>
+</ul>
+</p>
+
+<h5><a id="streams_processor_topology" href="#streams_processor_topology">Processor
Topology</a></h5>
+
+<p>
+With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code>
to build a processor topology
+by connecting these processors together:
+
+<pre>
+    TopologyBuilder builder = new TopologyBuilder();
+
+    builder.addSource("SOURCE", "src-topic")
+
+        .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate
MyProcessor1 */, "SOURCE")
+        .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate
MyProcessor2 */, "PROCESS1")
+        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate
MyProcessor3 */, "PROCESS1")
+
+        .addSink("SINK1", "sink-topic1", "PROCESS1")
+        .addSink("SINK2", "sink-topic2", "PROCESS2")
+        .addSink("SINK3", "sink-topic3", "PROCESS3");
+</pre>
+
+There are several steps in the above code to build the topology, and here is a quick walk
through:
+
+<ul>
+    <li>First of all a source node named "SOURCE" is added to the topology using the
<code>addSource</code> method, with one Kafka topic "src-topic" fed to it.</li>
+    <li>Three processor nodes are then added using the <code>addProcessor</code>
method; here the first processor is a child of the "SOURCE" node, but is the parent of the
other two processors.</li>
+    <li>Finally three sink nodes are added to complete the topology using the <code>addSink</code>
method, each piping from a different parent processor node and writing to a separate topic.</li>
+</ul>
+</p>
+
+<h5><a id="streams_processor_statestore" href="#streams_processor_statestore">Local
State Store</a></h5>
+
+<p>
+Note that the Processor API is not limited to only accessing the current records as they
arrive, but can also maintain local state stores
+that keep recently arrived records to use in stateful processing operations such as aggregation
or windowed joins.
+To take advantage of this local states, developers can use the <code>TopologyBuilder.addStateStore</code>
method when building the
+processor topology to create the local state and associate it with the processor nodes that
needs to access it; or they can connect a created
+local state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
+
+<pre>
+    TopologyBuilder builder = new TopologyBuilder();
+
+    builder.addSource("SOURCE", "src-topic")
+
+        .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
+        // create the in-memory state store "COUNTS" associated with processor "PROCESS1"
+        .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(),
"PROCESS1")
+        .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate
MyProcessor3 */, "PROCESS1")
+        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate
MyProcessor3 */, "PROCESS1")
+
+        // connect the state store "COUNTS" with processor "PROCESS2"
+        .connectProcessorAndStateStores("PROCESS2", "COUNTS");
+
+        .addSink("SINK1", "sink-topic1", "PROCESS1")
+        .addSink("SINK2", "sink-topic2", "PROCESS2")
+        .addSink("SINK3", "sink-topic3", "PROCESS3");
+</pre>
+
+</p>
+
+In the next section we present another way to build the processor topology: the Kafka Streams
DSL.
+
+<h4><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h4>
+
+To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code>
class, which is extended from the <code>TopologyBuilder</code>.
+A simple example is included with the source code for Kafka in the <code>streams/examples</code>
package. The rest of this section will walk
+through some code to demonstrate the key steps in creating a topology using the Streams DSL,
but we recommend developers to read the full example source
+codes for details.
+
+<h5><a id="streams_dsl_source" href="#streams_dsl_source">Create Source Streams
from Kafka</a></h5>
+
+<p>
+Either a <b>record stream</b> (defined as <code>KStream</code>) or
a <b>changelog stream</b> (defined as <code>KTable</code>)
+can be created as a source stream from one or more Kafka topics (for <code>KTable</code>
you can only create the source stream
+from a single topic).
+</p>
+
+<pre>
+    KStreamBuilder builder = new KStreamBuilder();
+
+    KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
+    KTable<String, GenericRecord> source2 = builder.table("topic3");
+</pre>
+
+<h5><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform a stream</a></h5>
+
+<p>
+There is a list of transformation operations provided for <code>KStream</code>
and <code>KTable</code> respectively.
+Each of these operations may generate either one or more <code>KStream</code>
and <code>KTable</code> objects and
+can be translated into one or more connected processors into the underlying processor topology.
+All these transformation methods can be chained together to compose a complex processor topology.
+Since <code>KStream</code> and <code>KTable</code> are strongly typed,
all these transformation operations are defined as
+generics functions where users could specify the input and output data types.
+</p>
+
+<p>
+Among these transformations, <code>filter</code>, <code>map</code>,
<code>mapValues</code>, etc, are stateless
+transformation operations and can be applied to both <code>KStream</code> and
<code>KTable</code>,
+where users can usually pass a customized function to these functions as a parameter, such
as <code>Predicate</code> for <code>filter</code>,
+<code>KeyValueMapper</code> for <code>map</code>, etc:
+
+</p>
+
+<pre>
+    // written in Java 8+, using lambda expressions
+    KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category"));
+</pre>
+
+<p>
+Stateless transformations, by definition, do not depend on any state for processing, and
hence implementation-wise
+they do not require a state store associated with the stream processor; Stateful transformations,
on the other hand,
+require accessing an associated state for processing and producing outputs.
+For example, in <code>join</code> and <code>aggregate</code> operations,
a windowing state is usually used to store all the received records
+within the defined window boundary so far. The operators can then access these accumulated
records in the store and compute
+based on them.
+</p>
+
+<pre>
+    // written in Java 8+, using lambda expressions
+    KTable<Windowed<String>, Long> counts = source1.aggregateByKey(
+        () -> 0L,  // initial value
+        (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
+        HoppingWindows.of("counts").with(5000L).every(1000L), // intervals in milliseconds
+    );
+
+    KStream<String, String> joined = source1.leftJoin(source2,
+        (record1, record2) -> record1.get("user") + "-" + record2.get("region");
+    );
+</pre>
+
+<h5><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafka</a></h5>
+
+<p>
+At the end of the processing, users can choose to (continuously) write the final resulted
streams back to a Kafka topic through
+<code>KStream.to</code> and <code>KTable.to</code>.
+</p>
+
+<pre>
+    joined.to("topic4");
+</pre>
+
+If your application needs to continue reading and processing the records after they have
been materialized
+to a topic via <code>to</code> above, one option is to construct a new stream
that reads from the output topic;
+Kafka Streams provides a convenience method called <code>through</code>:
+
+<pre>
+    // equivalent to
+    //
+    // joined.to("topic4");
+    // materialized = builder.stream("topic4");
+    KStream<String, String> materialized = joined.through("topic4");
+</pre>
+
+
+<br>
+<p>
+Besides defining the topology, developers will also need to configure their applications
+in <code>StreamsConfig</code> before running it. A complete list of
+Kafka Streams configs can be found <a href="#streamsconfigs"><b>here</b></a>.
+</p>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 0385bde..4124b32 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.examples.pageview;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -86,7 +87,7 @@ public class PageViewTypedDemo {
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 6f5cdf2..e61842f 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.examples.pageview;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -62,7 +63,7 @@ public class PageViewUntypedDemo {
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index 619f33d..3c1bd8c 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.examples.pipe;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KafkaStreams;
@@ -44,7 +45,7 @@ public class PipeDemo {
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index e892abb..c12977f 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.examples.wordcount;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -52,7 +53,7 @@ public class WordCountDemo {
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 8457415..a5cddfd 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.examples.wordcount;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -111,7 +112,7 @@ public class WordCountProcessorDemo {
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         TopologyBuilder builder = new TopologyBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/496bd3fd/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d4efbee..3e0f955 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -34,7 +34,6 @@ import org.apache.kafka.streams.processor.internals.StreamThread;
 import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 /**
  * Configuration for Kafka Streams. Documentation for these configurations can be found in
the <a
@@ -115,9 +114,6 @@ public class StreamsConfig extends AbstractConfig {
     /** <code>client.id</code> */
     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
 
-    /** <code>auto.offset.reset</code> */
-    public static final String AUTO_OFFSET_RESET_CONFIG = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
-
     static {
         CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG,      // required with no default
value
                                         Type.STRING,
@@ -197,12 +193,6 @@ public class StreamsConfig extends AbstractConfig {
                                         60000,
                                         Importance.LOW,
                                         STATE_CLEANUP_DELAY_MS_DOC)
-                                .define(AUTO_OFFSET_RESET_CONFIG,
-                                        Type.STRING,
-                                        "latest",
-                                        in("latest", "earliest", "none"),
-                                        Importance.MEDIUM,
-                                        ConsumerConfig.AUTO_OFFSET_RESET_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG,
                                         Type.LIST,
                                         "",
@@ -277,7 +267,7 @@ public class StreamsConfig extends AbstractConfig {
         Map<String, Object> props = this.originals();
 
         // remove consumer properties that are not required for producers
-        props.remove(StreamsConfig.AUTO_OFFSET_RESET_CONFIG);
+        props.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
 
         // remove streams properties
         removeStreamsSpecificConfigs(props);


Mime
View raw message