kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: doc changes for KIP-138
Date Wed, 30 Aug 2017 16:45:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6682abe4a -> f5310d645


MINOR: doc changes for KIP-138

1. Core concepts (added the stream time definition), upgrade guide and developer guide.
2. Related Java docs changes.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #3732 from guozhangwang/KMinor-kip138-docs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f5310d64
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f5310d64
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f5310d64

Branch: refs/heads/trunk
Commit: f5310d645c4920d37925e82edb07f1e4a582b0b2
Parents: 6682abe
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Aug 30 09:45:16 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Aug 30 09:45:16 2017 -0700

----------------------------------------------------------------------
 docs/streams/core-concepts.html                 | 18 +++++-----
 docs/streams/developer-guide.html               | 37 ++++++++++----------
 docs/streams/upgrade-guide.html                 | 19 +++++++++-
 docs/upgrade.html                               |  2 +-
 .../apache/kafka/streams/kstream/KStream.java   | 37 +++++++-------------
 5 files changed, 59 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f5310d64/docs/streams/core-concepts.html
----------------------------------------------------------------------
diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html
index 110ee8e..baf8d9b 100644
--- a/docs/streams/core-concepts.html
+++ b/docs/streams/core-concepts.html
@@ -95,14 +95,14 @@
         The choice between event-time and ingestion-time is actually done through the configuration
of Kafka (not Kafka Streams): From Kafka 0.10.x onwards, timestamps are automatically embedded
into Kafka messages. Depending on Kafka's configuration these timestamps represent event-time
or ingestion-time. The respective Kafka configuration setting can be specified on the broker
level or per topic. The default timestamp extractor in Kafka Streams will retrieve these embedded
timestamps as-is. Hence, the effective time semantics of your application depend on the effective
Kafka configuration for these embedded timestamps.
     </p>
     <p>
-        Kafka Streams assigns a <b>timestamp</b> to every data record
-        via the <code>TimestampExtractor</code> interface.
-        Concrete implementations of this interface may retrieve or compute timestamps based
on the actual contents of data records such as an embedded timestamp field
-        to provide event-time semantics, or use any other approach such as returning the
current wall-clock time at the time of processing,
-        thereby yielding processing-time semantics to stream processing applications.
-        Developers can thus enforce different notions of time depending on their business
needs. For example,
-        per-record timestamps describe the progress of a stream with regards to time (although
records may be out-of-order within the stream) and
-        are leveraged by time-dependent operations such as joins.
+        Kafka Streams assigns a <b>timestamp</b> to every data record via the
<code>TimestampExtractor</code> interface.
+        These per-record timestamps describe the progress of a stream with regards to time
and are leveraged by	time-dependent operations such as window operations.
+        As a result, this time will only advance when a new record arrives at the processor.
+        We call this data-driven time the <b>stream time</b> of the application
to differentiate with the <b>wall-clock time</b> when this application is actually
executing.
+        Concrete implementations of the <code>TimestampExtractor</code> interface
will then provide different semantics to the stream time definition.
+        For example retrieving or computing timestamps based on the actual contents of data
records such as an embedded timestamp field to provide event time semantics,
+        and returning the current wall-clock time thereby yield processing time semantics
to stream time.
+        Developers can thus enforce different notions of time depending on their business
needs.
     </p>
 
     <p>
@@ -111,7 +111,7 @@
 
     <ul>
         <li> When new output records are generated via processing some input record,
for example, <code>context.forward()</code> triggered in the <code>process()</code>
function call, output record timestamps are inherited from input record timestamps directly.</li>
-        <li> When new output records are generated via periodic functions such as <code>punctuate()</code>,
the output record timestamp is defined as the current internal time (obtained through <code>context.timestamp()</code>)
of the stream task.</li>
+        <li> When new output records are generated via periodic functions such as <code>Punctuator#punctuate()</code>,
the output record timestamp is defined as the current internal time (obtained through <code>context.timestamp()</code>)
of the stream task.</li>
         <li> For aggregations, the timestamp of a resulting aggregate update record
will be that of the latest arrived input record that triggered the update.</li>
     </ul>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5310d64/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 6d31f34..3af0175 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -64,8 +64,23 @@
     // keep the processor context locally because we need it in punctuate() and commit()
     this.context = context;
 
-    // call this processor's punctuate() method every 1000 milliseconds.
-    this.context.schedule(1000);
+    // schedule a punctuation method every 1000 milliseconds.
+    this.context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator() {
+        @Override
+        public void punctuate(long timestamp) {
+            KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
+
+            while (iter.hasNext()) {
+                KeyValue&lt;String, Long&gt; entry = iter.next();
+                context.forward(entry.key, entry.value.toString());
+            }
+
+            iter.close();
+
+            // commit the current processing progress
+            context.commit();
+        }
+        });
 
     // retrieve the key-value store named "Counts"
     this.kvStore = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("Counts");
@@ -87,20 +102,6 @@
     }
 
     @Override
-    public void punctuate(long timestamp) {
-    KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
-
-    while (iter.hasNext()) {
-        KeyValue&lt;String, Long&gt; entry = iter.next();
-        context.forward(entry.key, entry.value.toString());
-    }
-
-    iter.close();
-    // commit the current processing progress
-    context.commit();
-    }
-
-    @Override
     public void close() {
     // close any resources managed by this processor.
     // Note: Do not close any StateStores as these are managed
@@ -116,7 +117,7 @@
     <ul>
         <li>In the <code>init</code> method, schedule the punctuation every
1 second and retrieve the local state store by its name "Counts".</li>
         <li>In the <code>process</code> method, upon each received record,
split the value string into words, and update their counts into the state store (we will talk
about this feature later in the section).</li>
-        <li>In the <code>punctuate</code> method, iterate the local state
store and send the aggregated counts to the downstream processor, and commit the current stream
state.</li>
+        <li>In the scheduled <code>punctuate</code> method, iterate the
local state store and send the aggregated counts to the downstream processor, and commit the
current stream state.</li>
     </ul>
 
 
@@ -365,7 +366,7 @@ In addition, users can add <em>source processors</em> with
the specified Kafka t
 
 <p>
 In this topology, the &quot;Process&quot; stream processor node is considered a downstream
processor of the &quot;Source&quot; node, and an upstream processor of the &quot;Sink&quot;
node. 
-As a result, whenever the &quot;Source&quot; node forward a newly fetched record
from Kafka to its downstream &quot;Process&quot; node, <code>WordCountProcessor#process()</code>
method is triggered to process the record and update the associated state store; and whenever
<code>context#forward()</code> is called in the <code>WordCountProcessor#punctuate()</code>
method, the aggregate key-value pair will be sent via the &quot;Sink&quot; processor
node to the Kafka topic &quot;sink-topic&quot;. 
+As a result, whenever the &quot;Source&quot; node forward a newly fetched record
from Kafka to its downstream &quot;Process&quot; node, <code>WordCountProcessor#process()</code>
method is triggered to process the record and update the associated state store; and whenever
<code>context#forward()</code> is called in the <code>Punctuator#punctuate()</code>
method, the aggregate key-value pair will be sent via the &quot;Sink&quot; processor
node to the Kafka topic &quot;sink-topic&quot;.
 Note that in the <code>WordCountProcessor</code> implementation, users need to
refer to the same store name &quot;Counts&quot; when accessing the key-value store;
otherwise an exception will be thrown at runtime, indicating that the state store cannot be
found; also, if the state store itself is not associated with the processor in the <code>Topology</code>
code, accessing it in the processor's <code>init()</code> method will also throw
an exception at runtime, indicating the state store is not accessible from this processor.
 </p>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5310d64/docs/streams/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index bd0bb8a..7f695f5 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -45,7 +45,24 @@
     </p>
 
     <h3><a id="streams_api_changes_100" href="#streams_api_changes_100">Streams
API changes in 1.0.0</a></h3>
-    <!-- TODO: KIP-120, 138, 160, 161, 167, 173 -->
+    <!-- TODO: KIP-120, 160, 161, 167, 173 -->
+
+    <p>
+        The Processor API was extended to allow users to schedule <code>punctuate</code>
functions either based on data-driven <b>stream time</b> or wall-clock time.
+        As a result, the original <code>ProcessorContext#schedule</code> is deprecated
with a new overloaded function that accepts a user customizable <code>Punctuator</code>
callback interface, which triggers its <code>punctuate</code> API method periodically
based on the <code>PunctuationType</code>.
+        The <code>PunctuationType</code> determines what notion of time is used
for the punctuation scheduling: either <a href="/{{version}}/documentation/streams/core-concepts#streams_time">stream
time</a> or wall-clock time (by default, <b>stream time</b> is configured
to represent event time via <code>TimestampExtractor</code>).
+        In addition, the <code>punctuate</code> function inside <code>Processor</code>
is also deprecated.
+    </p>
+
+    <p>
+        Before this, users could only schedule based on stream time (i.e. <code>PunctuationType.STREAM_TIME</code>)
and hence the <code>punctuate</code> function was data-driven only because stream
time is determined (and advanced forward) by the timestamps derived from the input data.
+        If there is no data arriving at the processor, the stream time would not advance
and hence punctuation will not be triggered.
+        On the other hand, When wall-clock time (i.e. <code>PunctuationType.SYSTEM_TIME</code>)
is used, <code>punctuate</code> will be triggered purely based on wall-clock time.
+        So for example if the <code>Punctuator</code> function is scheduled based
on <code>PunctuationType.SYSTEM_TIME</code>, if these 60 records were processed
within 20 seconds,
+        <code>punctuate</code> would be called 2 times (one time every 10 seconds);
+        if these 60 records were processed within 5 seconds, then no <code>punctuate</code>
would be called at all.
+        Users can schedule multiple <code>Punctuator</code> callbacks with different
<code>PunctuationType</code>s within the same processor by simply calling <code>ProcessorContext#schedule</code>
multiple times inside processor's <code>init()</code> method.
+    </p>
 
     <p>
         If you are monitoring on task level or processor-node / state store level Streams
metrics, please note that the metrics sensor name and hierarchy was changed:

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5310d64/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 05e39e9..ed0f9cf 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -82,7 +82,7 @@
         A Kafka Streams 1.0.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers
(it is not possible to connect to 0.10.0 brokers though). </li>
     <li> If you are monitoring on streams metrics, you will need make some changes
to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy
was changed. </li>
     <li> There are a few public APIs including <code>ProcessorContext#schedule()</code>,
<code>Processor#punctuate()</code> and <code>KStreamBuilder</code>,
<code>TopologyBuilder</code> are being deprecated by new APIs.
-        We recommend making corresponding code changes, which should be very minor since
the new APIs look very similary, when you upgrade.
+        We recommend making corresponding code changes, which should be very minor since
the new APIs look quite similar, when you upgrade.
     <li> See <a href="/{{version}}/documentation/streams#streams_api_changes_100">Streams
API changes in 1.0.0</a> for more details. </li>
 </ul>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5310d64/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 5f1ba6e..d386696 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -795,7 +795,7 @@ public interface KStream<K, V> {
      * computes zero or more output records.
      * Thus, an input record {@code <K,V>} can be transformed into output records {@code
<K':V'>, <K'':V''>, ...}.
      * This is a stateful record-by-record operation (cf. {@link #flatMap(KeyValueMapper)}).
-     * Furthermore, via {@link Transformer#punctuate(long)} the processing progress can be
observed and additional
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress can be observed and additional
      * periodic actions can be performed.
      * <p>
      * In order to assign a state, the state must be created and registered beforehand:
@@ -815,9 +815,9 @@ public interface KStream<K, V> {
      * <p>
      * Within the {@link Transformer}, the state is obtained via the
      * {@link  ProcessorContext}.
-     * To trigger periodic actions via {@link Transformer#punctuate(long) punctuate()}, a
schedule must be registered.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()}, a schedule must be registered.
      * The {@link Transformer} must return a {@link KeyValue} type in {@link Transformer#transform(Object,
Object)
-     * transform()} and {@link Transformer#punctuate(long) punctuate()}.
+     * transform()} and {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()}.
      * <pre>{@code
      * new TransformerSupplier() {
      *     Transformer get() {
@@ -828,7 +828,9 @@ public interface KStream<K, V> {
      *             void init(ProcessorContext context) {
      *                 this.context = context;
      *                 this.state = context.getStateStore("myTransformState");
-     *                 context.schedule(1000); // call #punctuate() each 1000ms
+     *                 // punctuate each 1000ms; can access this.state
+     *                 // can emit as many new KeyValue pairs as required via this.context#forward()
+     *                 context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..));
      *             }
      *
      *             KeyValue transform(K key, V value) {
@@ -837,12 +839,6 @@ public interface KStream<K, V> {
      *                 return new KeyValue(key, value); // can emit a single value via return
-- can also be null
      *             }
      *
-     *             KeyValue punctuate(long timestamp) {
-     *                 // can access this.state
-     *                 // can emit as many new KeyValue pairs as required via this.context#forward()
-     *                 return null; // don't return result -- can also be "new KeyValue()"
-     *             }
-     *
      *             void close() {
      *                 // can access this.state
      *                 // can emit as many new KeyValue pairs as required via this.context#forward()
@@ -874,7 +870,7 @@ public interface KStream<K, V> {
      * record value and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output record
{@code <K:V'>}.
      * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
-     * Furthermore, via {@link ValueTransformer#punctuate(long)} the processing progress
can be observed and additional
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress can be observed and additional
      * periodic actions get be performed.
      * <p>
      * In order to assign a state, the state must be created and registered beforehand:
@@ -894,7 +890,7 @@ public interface KStream<K, V> {
      * <p>
      * Within the {@link ValueTransformer}, the state is obtained via the
      * {@link ProcessorContext}.
-     * To trigger periodic actions via {@link ValueTransformer#punctuate(long) punctuate()},
a schedule must be
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()}, a schedule must be
      * registered.
      * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no
additional {@link KeyValue}
      * pairs should be emitted via {@link ProcessorContext#forward(Object, Object)
@@ -907,7 +903,7 @@ public interface KStream<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = context.getStateStore("myValueTransformState");
-     *                 context.schedule(1000); // call #punctuate() each 1000ms
+     *                 context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
      *             }
      *
      *             NewValueType transform(V value) {
@@ -915,11 +911,6 @@ public interface KStream<K, V> {
      *                 return new NewValueType(); // or null
      *             }
      *
-     *             NewValueType punctuate(long timestamp) {
-     *                 // can access this.state
-     *                 return null; // don't return result -- can also be "new NewValueType()"
(current key will be used to build KeyValue pair)
-     *             }
-     *
      *             void close() {
      *                 // can access this.state
      *             }
@@ -947,7 +938,7 @@ public interface KStream<K, V> {
      * Process all records in this stream, one record at a time, by applying a {@link Processor}
(provided by the given
      * {@link ProcessorSupplier}).
      * This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
-     * Furthermore, via {@link Processor#punctuate(long)} the processing progress can be
observed and additional
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}
the processing progress can be observed and additional
      * periodic actions can be performed.
      * Note that this is a terminal operation that returns void.
      * <p>
@@ -968,7 +959,7 @@ public interface KStream<K, V> {
      * <p>
      * Within the {@link Processor}, the state is obtained via the
      * {@link ProcessorContext}.
-     * To trigger periodic actions via {@link Processor#punctuate(long) punctuate()},
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
punctuate()},
      * a schedule must be registered.
      * <pre>{@code
      * new ProcessorSupplier() {
@@ -978,17 +969,13 @@ public interface KStream<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = context.getStateStore("myProcessorState");
-     *                 context.schedule(1000); // call #punctuate() each 1000ms
+     *                 context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
      *             }
      *
      *             void process(K key, V value) {
      *                 // can access this.state
      *             }
      *
-     *             void punctuate(long timestamp) {
-     *                 // can access this.state
-     *             }
-     *
      *             void close() {
      *                 // can access this.state
      *             }


Mime
View raw message