kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: MINOR: additional kip-182 doc updates
Date Mon, 02 Oct 2017 20:20:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cdbf806e2 -> cc84686a4


MINOR: additional kip-182 doc updates

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Michael G. Noll <michael@confluent.io>, Bill Bejeck <bill@confluent.io>,
Matthias J. Sax <matthias@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3971 from dguy/kip-182-docs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cc84686a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cc84686a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cc84686a

Branch: refs/heads/trunk
Commit: cc84686a4aa24e541f7ca5ee9dcb0dea0ddbd79a
Parents: cdbf806
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Oct 2 13:20:49 2017 -0700
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Mon Oct 2 13:20:49 2017 -0700

----------------------------------------------------------------------
 docs/streams/developer-guide.html | 244 +++++++++++++++++----------------
 1 file changed, 128 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cc84686a/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index a064a5d..842325b 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -1383,65 +1383,72 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                     // Java 8+ examples, using lambda expressions
 
                     // Aggregating with time-based windowing (here: with 5-minute tumbling
windows)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream
= groupedStream.aggregate(
-                        () -> 0L, /* initializer */
-                        (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder
*/
-                        TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window
*/
-                        Serdes.Long(), /* serde for aggregate value */
-                        "time-windowed-aggregated-stream-store" /* state store name */);
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream
= groupedStream
+                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based
window */
+                        .aggregate(
+                            () -> 0L, /* initializer */
+                            (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder
*/
+                            Materialized.&lt;String, Long, WindowStore&lt;Bytes,
byte[]&gt;&gt;as("time-windowed-aggregated-stream-store") /* state store name */
+                                .withValueSerde(Serdes.Long())); /* serde for aggregate value
*/
+
 
                     // Aggregating with session-based windowing (here: with an inactivity
gap of 5 minutes)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream
= groupedStream.aggregate(
-                        () -> 0L, /* initializer */
-                        (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder
*/
-                        (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue,
/* session merger */
-                        SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window
*/
-                        Serdes.Long(), /* serde for aggregate value */
-                        "sessionized-aggregated-stream-store" /* state store name */);
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream
= groupedStream
+                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /*
session window */
+                        .aggregate(
+                            () -> 0L, /* initializer */
+                            (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder
*/
+                            (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue,
/* session merger */
+                            Materialized.&lt;String, Long, SessionStore&lt;Bytes,
byte[]&gt;&gt;as("sessionized-aggregated-stream-store") /* state store name */
+                                .withValueSerde(Serdes.Long())); /* serde for aggregate value
*/
 
                     // Java 7 examples
 
                     // Aggregating with time-based windowing (here: with 5-minute tumbling
windows)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream
= groupedStream.aggregate(
-                        new Initializer&lt;Long&gt;() { /* initializer */
-                          @Override
-                          public Long apply() {
-                            return 0L;
-                          }
-                        },
-                        new Aggregator&lt;String, Long, Long&gt;() { /* adder */
-                          @Override
-                          public Long apply(String aggKey, Long newValue, Long aggValue)
{
-                            return aggValue + newValue;
-                          }
-                        },
-                        TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window
*/
-                        Serdes.Long(), /* serde for aggregate value */
-                        "time-windowed-aggregated-stream-store" /* state store name */);
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream
= groupedStream
+                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based
window */
+                        .aggregate(
+                            new Initializer&lt;Long&gt;() { /* initializer */
+                              @Override
+                              public Long apply() {
+                                return 0L;
+                              }
+                            },
+                            new Aggregator&lt;String, Long, Long&gt;() { /* adder
*/
+                              @Override
+                              public Long apply(String aggKey, Long newValue, Long aggValue)
{
+                                return aggValue + newValue;
+                              }
+                            },
+                            Materialized.&lt;String, Long, WindowStore&lt;Bytes,
byte[]&gt;&gt;as("time-windowed-aggregated-stream-store") /* state store name */
+                                    .withValueSerde(Serdes.Long()) /* serde for aggregate
value */
+                    );
 
                     // Aggregating with session-based windowing (here: with an inactivity
gap of 5 minutes)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream
= groupedStream.aggregate(
-                        new Initializer&lt;Long&gt;() { /* initializer */
-                          @Override
-                          public Long apply() {
-                            return 0L;
-                          }
-                        },
-                        new Aggregator&lt;String, Long, Long&gt;() { /* adder */
-                          @Override
-                          public Long apply(String aggKey, Long newValue, Long aggValue)
{
-                            return aggValue + newValue;
-                          }
-                        },
-                        new Merger&lt;String, Long&gt;() { /* session merger */
-                          @Override
-                          public Long apply(String aggKey, Long leftAggValue, Long rightAggValue)
{
-                            return rightAggValue + leftAggValue;
-                          }
-                        },
-                        SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window
*/
-                        Serdes.Long(), /* serde for aggregate value */
-                        "sessionized-aggregated-stream-store" /* state store name */);
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream
= groupedStream
+                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /*
session window */
+                        .aggregate(
+                            new Initializer&lt;Long&gt;() { /* initializer */
+                              @Override
+                              public Long apply() {
+                                return 0L;
+                              }
+                            },
+                            new Aggregator&lt;String, Long, Long&gt;() { /* adder
*/
+                              @Override
+                              public Long apply(String aggKey, Long newValue, Long aggValue)
{
+                                return aggValue + newValue;
+                              }
+                            },
+                            new Merger&lt;String, Long&gt;() { /* session merger
*/
+                              @Override
+                              public Long apply(String aggKey, Long leftAggValue, Long rightAggValue)
{
+                                return rightAggValue + leftAggValue;
+                              }
+                            },
+                            Materialized.&lt;String, Long, SessionStore&lt;Bytes,
byte[]&gt;&gt;as("sessionized-aggregated-stream-store") /* state store name */
+                                .withValueSerde(Serdes.Long()) /* serde for aggregate value
*/
+                    );
                 </pre>
 
                 <p>
@@ -1478,12 +1485,10 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                     KGroupedTable&lt;String, Long&gt; groupedTable = ...;
 
                     // Counting a KGroupedStream
-                    KTable&lt;String, Long&gt; aggregatedStream = groupedStream.count(
-                        "counted-stream-store" /* state store name */);
+                    KTable&lt;String, Long&gt; aggregatedStream = groupedStream.count();
 
                     // Counting a KGroupedTable
-                    KTable&lt;String, Long&gt; aggregatedTable = groupedTable.count(
-                        "counted-table-store" /* state store name */);
+                    KTable&lt;String, Long&gt; aggregatedTable = groupedTable.count();
                 </pre>
                 <p>
                     Detailed behavior for <code>KGroupedStream</code>:
@@ -1518,14 +1523,14 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                     KGroupedStream&lt;String, Long&gt; groupedStream = ...;
 
                     // Counting a KGroupedStream with time-based windowing (here: with 5-minute
tumbling windows)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream
= groupedStream.count(
-                        TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window
*/
-                        "time-windowed-counted-stream-store" /* state store name */);
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream
= groupedStream
+                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based
window */
+                        .count();
 
                     // Counting a KGroupedStream with session-based windowing (here: with
5-minute inactivity gaps)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream
= groupedStream.count(
-                        SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window
*/
-                        "sessionized-counted-stream-store" /* state store name */);
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; aggregatedStream
= groupedStream
+                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /*
session window */
+                        .count();
                 </pre>
                 <p>
                     Detailed behavior:
@@ -1561,14 +1566,14 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
 
                     // Reducing a KGroupedStream
                     KTable&lt;String, Long&gt; aggregatedStream = groupedStream.reduce(
-                        (aggValue, newValue) -> aggValue + newValue, /* adder */
-                        "reduced-stream-store" /* state store name */);
+                        (aggValue, newValue) -> aggValue + newValue /* adder */
+                    );
 
                     // Reducing a KGroupedTable
                     KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
                         (aggValue, newValue) -> aggValue + newValue, /* adder */
-                        (aggValue, oldValue) -> aggValue - oldValue, /* subtractor */
-                        "reduced-table-store" /* state store name */);
+                        (aggValue, oldValue) -> aggValue - oldValue /* subtractor */
+                    );
 
 
                     // Java 7 examples
@@ -1580,8 +1585,8 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                           public Long apply(Long aggValue, Long newValue) {
                             return aggValue + newValue;
                           }
-                        },
-                        "reduced-stream-store" /* state store name */);
+                        }
+                    );
 
                     // Reducing a KGroupedTable
                     KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
@@ -1596,8 +1601,8 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                           public Long apply(Long aggValue, Long oldValue) {
                             return aggValue - oldValue;
                           }
-                        },
-                        "reduced-table-store" /* state store name */);
+                        }
+                    );
                 </pre>
                 <p>
                     Detailed behavior for <code>KGroupedStream</code>:
@@ -1659,41 +1664,39 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                     // Java 8+ examples, using lambda expressions
 
                     // Aggregating with time-based windowing (here: with 5-minute tumbling
windows)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream
= groupedStream.reduce(
-                        (aggValue, newValue) -> aggValue + newValue, /* adder */
-                        TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window
*/
-                        "time-windowed-reduced-stream-store" /* state store name */);
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream
= groupedStream
+                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based
window */
+                        .reduce((aggValue, newValue) -> aggValue + newValue /* adder */);
 
                     // Aggregating with session-based windowing (here: with an inactivity
gap of 5 minutes)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionzedAggregatedStream
= groupedStream.reduce(
-                        (aggValue, newValue) -> aggValue + newValue, /* adder */
-                        SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window
*/
-                        "sessionized-reduced-stream-store" /* state store name */);
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionzedAggregatedStream
= groupedStream
+                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /*
session window */
+                        .reduce((aggValue, newValue) -> aggValue + newValue); /* adder
*/
 
 
                     // Java 7 examples
 
                     // Aggregating with time-based windowing (here: with 5-minute tumbling
windows)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream
= groupedStream.reduce(
-                        new Reducer&lt;Long&gt;() { /* adder */
-                          @Override
-                          public Long apply(Long aggValue, Long newValue) {
-                            return aggValue + newValue;
-                          }
-                        },
-                        TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window
*/
-                        "time-windowed-reduced-stream-store" /* state store name */);
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream
= groupedStream
+                        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based
window */
+                        .reduce(
+                            new Reducer&lt;Long&gt;() { /* adder */
+                              @Override
+                              public Long apply(Long aggValue, Long newValue) {
+                                return aggValue + newValue;
+                              }
+                            });
 
                     // Aggregating with session-based windowing (here: with an inactivity
gap of 5 minutes)
-                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream
= groupedStream.reduce(
-                        new Reducer&lt;Long&gt;() { /* adder */
-                          @Override
-                          public Long apply(Long aggValue, Long newValue) {
-                            return aggValue + newValue;
-                          }
-                        },
-                        SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window
*/
-                        "sessionized-reduced-stream-store" /* state store name */);
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream
= groupedStream
+                        .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /*
session window */
+                        .reduce(
+                            new Reducer&lt;Long&gt;() { /* adder */
+                              @Override
+                              public Long apply(Long aggValue, Long newValue) {
+                                return aggValue + newValue;
+                              }
+                        });
                 </pre>
 
                 <p>
@@ -1723,16 +1726,22 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
     </p>
     <pre class="brush: java;">
         // Key: word, value: count
+        Properties streamsProperties == ...;
+
+        // specify the default serdes so we don't need to elsewhere.
+        streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        StreamsConfig config = new StreamsConfig(streamsProperties);
+
         KStream&lt;String, Integer&gt; wordCounts = ...;
 
         KGroupedStream&lt;String, Integer&gt; groupedStream = wordCounts
-            .groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()));
+            .groupByKey();
 
         KTable&lt;String, Integer&gt; aggregated = groupedStream.aggregate(
             () -> 0, /* initializer */
-            (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
-            Serdes.Integer(), /* serde for aggregate value */
-            "aggregated-stream-store" /* state store name */);
+            (aggKey, newValue, aggValue) -> aggValue + newValue /* adder */
+        );
     </pre>
 
     <p>
@@ -1836,8 +1845,10 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
             () -> 0, /* initializer */
             (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
             (aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */
-            Serdes.Integer(), /* serde for aggregate value */
-            "aggregated-table-store" /* state store name */);
+            Materialized.&lt;String, Integer, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("aggregated-table-store")
+                .withKeySerde(Serdes.String() /* serde for aggregate key */)
+                .withValueSerde(Serdes.Long() /* serde for aggregate value */)
+        );
     </pre>
     <p>
         <b>Impact of <a href=#streams_developer-guide_memory-management_record-cache>record
caches</a></b>:
@@ -2253,7 +2264,10 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
             .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
 
           // Create a window state store named "CountsWindowStore" that contains the word
counts for every minute
-          groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore");
+          groupedByWord.windowedBy(TimeWindows.of(60000))
+            .count(Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as("CountsWindowStore")
+                withKeySerde(Serdes.String()); // count() sets value serde to Serdes.Long()
automatically
+        );
         </pre>
 
     <p>
@@ -2396,14 +2410,14 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
           Topology topology = ...;
           ProcessorSupplier processorSuppler = ...;
 
-          // Create CustomStoreSupplier for store name the-custom-store
-          MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store");
+          // Create CustomStoreBuilder for store name the-custom-store
+          MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder("the-custom-store");
           // Add the source topic
           topology.addSource("input", "inputTopic");
           // Add a custom processor that reads from the source topic
           topology.addProcessor("the-processor", processorSupplier, "input");
           // Connect your custom state store to the custom processor above
-          topology.addStateStore(customStoreSupplier, "the-processor");
+          topology.addStateStore(customStoreBuilder, "the-processor");
 
           KafkaStreams streams = new KafkaStreams(topology, config);
           streams.start();
@@ -2478,7 +2492,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
 
           // This call to `count()` creates a state store named "word-count".
           // The state store is discoverable and can be queried interactively.
-          groupedByWord.count("word-count");
+          groupedByWord.count(Materialized.&ltString, Long,  KeyValueStore&lt;Bytes,
byte[]&gt;&gt;as("word-count"));
 
           // Start an instance of the topology
           KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -2835,22 +2849,20 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
 
     <p>
         For changelog topics you can also override the default configs on a per store basis.
-        This can be done by using any method overload that has a <code>StateStoreSupplier</code>
as a parameter:
+        This can be done by using any method overload that has a <code>Materialized</code>
as a parameter:
     </p>
 
     <pre class="brush: java;">
         // a map to add topic config
-        Map&lt;String, String&gt; topicConfig = new HashMap<>();
+        Map&lt;String, String&gt; topicConfig = new HashMap&lt;&gt;();
         topicConfig.put(TopicConfig.SEGMENT_MS_CONFIG, "10000");
 
-        StateStoreSupplier supplier = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .persistent()
-                .enableLogging(topicConfig) // pass in the config overrides
-                .build();
-        
-        groupedStream.count(supplier)
+        final Materialized&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;
materialized = Materialized.as("store")
+            .withKeySerde(Serdes.String())
+            .withValueSerde(Serdes.String())
+            .withLoggingEnabled(topicConfig); // pass in the config overrides
+
+        groupedStream.count(materialized)
     </pre>
 
     <h4><a id="streams_execute" href="#streams_execute">Executing Your Kafka
Streams Application</a></h4>


Mime
View raw message