kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: update web docs and examples of Streams with Java8 syntax (#5249)
Date Thu, 21 Jun 2018 17:03:02 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d3e264e  MINOR: update web docs and examples of Streams with Java8 syntax (#5249)
d3e264e is described below

commit d3e264e773c4652f34b40c7c3b494c0f7fbabffc
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Jun 21 10:02:58 2018 -0700

    MINOR: update web docs and examples of Streams with Java8 syntax (#5249)
    
    Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>,
Damian Guy <damian@confluent.io>
---
 docs/streams/developer-guide/dsl-api.html          | 10 +--
 docs/streams/developer-guide/testing.html          | 25 ++-----
 .../examples/pageview/PageViewTypedDemo.java       | 84 +++++++++++-----------
 .../examples/pageview/PageViewUntypedDemo.java     | 54 +++++---------
 .../kafka/streams/examples/pipe/PipeDemo.java      |  2 +-
 .../examples/temperature/TemperatureDemo.java      | 32 +++------
 .../streams/examples/wordcount/WordCountDemo.java  | 18 +----
 .../examples/wordcount/WordCountProcessorDemo.java | 22 +++---
 .../src/main/java/LineSplit.java                   | 14 ----
 .../src/main/java/WordCount.java                   | 22 ------
 10 files changed, 88 insertions(+), 195 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index cd3a965..beb83a3 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -898,9 +898,9 @@
 <span class="c1">// Aggregating with time-based windowing (here: with 5-minute tumbling
windows)</span>
 <span class="n">KTable</span><span class="o">&lt;</span><span
class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span
class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span>
<span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span>
<span class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span class="n">TimeUnit</span><span
class="o">.</span><span class="na [...]
     <span class="o">.</span><span class="na">aggregate</span><span
class="o">(</span>
-      <span class="o">()</span> <span class="o">-&gt;</span>
<span class="mi">0</span><span class="n">L</span><span class="o">,</span>
<span class="cm">/* initializer */</span>
-    	<span class="o">(</span><span class="n">aggKey</span><span
class="o">,</span> <span class="n">newValue</span><span class="o">,</span>
<span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span>
<span class="n">aggValue</span> <span class="o">+</span> <span
class="n">newValue</span><span class="o">,</span> <span class="cm">/*
adder */</span>
-      <span class="n">Materialized</span><span class="o">.&lt;</span><span
class="n">String</span><span class="o">,</span> <span class="n">Long</span><span
class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span
class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span
class="o">[]&gt;&gt;</span><span class="n">as</span><span
class="o">(</span><span class="s">&quot;time-windowed-aggregated-stream-store&quot;</span><span
class="o">)</sp [...]
+        <span class="o">()</span> <span class="o">-&gt;</span>
<span class="mi">0</span><span class="n">L</span><span class="o">,</span>
<span class="cm">/* initializer */</span>
+        <span class="o">(</span><span class="n">aggKey</span><span
class="o">,</span> <span class="n">newValue</span><span class="o">,</span>
<span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span>
<span class="n">aggValue</span> <span class="o">+</span> <span
class="n">newValue</span><span class="o">,</span> <span class="cm">/*
adder */</span>
+        <span class="n">Materialized</span><span class="o">.&lt;</span><span
class="n">String</span><span class="o">,</span> <span class="n">Long</span><span
class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span
class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span
class="o">[]&gt;&gt;</span><span class="n">as</span><span
class="o">(</span><span class="s">&quot;time-windowed-aggregated-stream-store&quot;</span><span
class="o">)</ [...]
         <span class="o">.</span><span class="na">withValueSerde</span><span
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">()));</span> <span class="cm">/*
serde for aggregate value */</span>
 
 <span class="c1">// Aggregating with session-based windowing (here: with an inactivity
gap of 5 minutes)</span>
@@ -908,8 +908,8 @@
     <span class="n">aggregate</span><span class="o">(</span>
     	<span class="o">()</span> <span class="o">-&gt;</span> <span
class="mi">0</span><span class="n">L</span><span class="o">,</span>
<span class="cm">/* initializer */</span>
     	<span class="o">(</span><span class="n">aggKey</span><span
class="o">,</span> <span class="n">newValue</span><span class="o">,</span>
<span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span>
<span class="n">aggValue</span> <span class="o">+</span> <span
class="n">newValue</span><span class="o">,</span> <span class="cm">/*
adder */</span>
-    	<span class="o">(</span><span class="n">aggKey</span><span
class="o">,</span> <span class="n">leftAggValue</span><span class="o">,</span>
<span class="n">rightAggValue</span><span class="o">)</span> <span
class="o">-&gt;</span> <span class="n">leftAggValue</span> <span
class="o">+</span> <span class="n">rightAggValue</span><span class="o">,</span>
<span class="cm">/* session merger */</span>
-	    <span class="n">Materialized</span><span class="o">.&lt;</span><span
class="n">String</span><span class="o">,</span> <span class="n">Long</span><span
class="o">,</span> <span class="n">SessionStore</span><span class="o">&lt;</span><span
class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span
class="o">[]&gt;&gt;</span><span class="n">as</span><span
class="o">(</span><span class="s">&quot;sessionized-aggregated-stream-store&quot;</span><span
class="o">)</span [...]
+        <span class="o">(</span><span class="n">aggKey</span><span
class="o">,</span> <span class="n">leftAggValue</span><span class="o">,</span>
<span class="n">rightAggValue</span><span class="o">)</span> <span
class="o">-&gt;</span> <span class="n">leftAggValue</span> <span
class="o">+</span> <span class="n">rightAggValue</span><span class="o">,</span>
<span class="cm">/* session merger */</span>
+        <span class="n">Materialized</span><span class="o">.&lt;</span><span
class="n">String</span><span class="o">,</span> <span class="n">Long</span><span
class="o">,</span> <span class="n">SessionStore</span><span class="o">&lt;</span><span
class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span
class="o">[]&gt;&gt;</span><span class="n">as</span><span
class="o">(</span><span class="s">&quot;sessionized-aggregated-stream-store&quot;</span><span
class="o">)</s [...]
         <span class="o">.</span><span class="na">withValueSerde</span><span
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">()));</span> <span class="cm">/*
serde for aggregate value */</span>
 
 <span class="c1">// Java 7 examples</span>
diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html
index 92d8fce..bdecc43 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -255,18 +255,8 @@ public class CustomMaxAggregator implements Processor&lt;String,
Long&gt; {
     @Override
     public void init(ProcessorContext context) {
         this.context = context;
-        context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
-            @Override
-            public void punctuate(long timestamp) {
-                flushStore();
-            }
-        });
-        context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
-            @Override
-            public void punctuate(long timestamp) {
-                flushStore();
-            }
-        });
+        context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, time -&gt; flushStore());
+        context.schedule(10000, PunctuationType.STREAM_TIME, time -&gt; flushStore());
         store = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("aggStore");
     }
 
@@ -287,9 +277,6 @@ public class CustomMaxAggregator implements Processor&lt;String, Long&gt;
{
     }
 
     @Override
-    public void punctuate(long timestamp) {} // deprecated; not used
-
-    @Override
     public void close() {}
 }
         </pre>
@@ -407,12 +394,8 @@ punctuator.punctuate(/*timestamp*/ 0L);
         </div>
     </div>
     <div class="pagination">
-        <div class="pagination">
-            <a href="/{{version}}/documentation/streams/developer-guide/datatypes"
-               class="pagination__btn pagination__btn__prev">Previous</a>
-            <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries"
-               class="pagination__btn pagination__btn__next">Next</a>
-        </div>
+        <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn
pagination__btn__prev">Previous</a>
+        <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries"
class="pagination__btn pagination__btn__next">Next</a>
     </div>
 </script>
 
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index bd24e84..234d3fc 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -28,16 +28,14 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -83,7 +81,7 @@ public class PageViewTypedDemo {
         public String region;
     }
 
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -151,56 +149,56 @@ public class PageViewTypedDemo {
                                                           Consumed.with(Serdes.String(),
userProfileSerde));
 
         KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
-            .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>()
{
-                @Override
-                public PageViewByRegion apply(PageView view, UserProfile profile) {
-                    PageViewByRegion viewByRegion = new PageViewByRegion();
-                    viewByRegion.user = view.user;
-                    viewByRegion.page = view.page;
-
-                    if (profile != null) {
-                        viewByRegion.region = profile.region;
-                    } else {
-                        viewByRegion.region = "UNKNOWN";
-                    }
-                    return viewByRegion;
-                }
-            })
-            .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>()
{
-                @Override
-                public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion
viewRegion) {
-                    return new KeyValue<>(viewRegion.region, viewRegion);
+            .leftJoin(users, (view, profile) -> {
+                PageViewByRegion viewByRegion = new PageViewByRegion();
+                viewByRegion.user = view.user;
+                viewByRegion.page = view.page;
+
+                if (profile != null) {
+                    viewByRegion.region = profile.region;
+                } else {
+                    viewByRegion.region = "UNKNOWN";
                 }
+                return viewByRegion;
             })
+            .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
             .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
             .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
             .count()
             .toStream()
-            .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion,
RegionCount>>() {
-                @Override
-                public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String>
key, Long value) {
-                    WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
-                    wViewByRegion.windowStart = key.window().start();
-                    wViewByRegion.region = key.key();
-
-                    RegionCount rCount = new RegionCount();
-                    rCount.region = key.key();
-                    rCount.count = value;
-
-                    return new KeyValue<>(wViewByRegion, rCount);
-                }
+            .map((key, value) -> {
+                WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+                wViewByRegion.windowStart = key.window().start();
+                wViewByRegion.region = key.key();
+
+                RegionCount rCount = new RegionCount();
+                rCount.region = key.key();
+                rCount.count = value;
+
+                return new KeyValue<>(wViewByRegion, rCount);
             });
 
         // write to the result topic
         regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde,
regionCountSerde));
 
         KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        streams.start();
-
-        // usually the stream application would be running forever,
-        // in this example we just let it run for some time and stop since the input data
is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new Thread("streams-pipe-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            System.exit(1);
+        }
+        System.exit(0);
     }
 }
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index c38d685..dddb542 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -33,13 +33,9 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Windowed;
 
 import java.util.Properties;
 
@@ -79,46 +75,30 @@ public class PageViewUntypedDemo {
 
         KTable<String, JsonNode> users = builder.table("streams-userprofile-input",
consumed);
 
-        KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode,
String>() {
-            @Override
-            public String apply(JsonNode record) {
-                return record.get("region").textValue();
-            }
-        });
+        KTable<String, String> userRegions = users.mapValues(record -> record.get("region").textValue());
 
         KStream<JsonNode, JsonNode> regionCount = views
-            .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
-                @Override
-                public JsonNode apply(JsonNode view, String region) {
-                    ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
-                    return jNode.put("user", view.get("user").textValue())
-                            .put("page", view.get("page").textValue())
-                            .put("region", region == null ? "UNKNOWN" : region);
-                }
-            })
-            .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>()
{
-                @Override
-                public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion)
{
-                    return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
-                }
+            .leftJoin(userRegions, (view, region) -> {
+                ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+                return (JsonNode) jNode.put("user", view.get("user").textValue())
+                        .put("page", view.get("page").textValue())
+                        .put("region", region == null ? "UNKNOWN" : region);
+
             })
+            .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(),
viewRegion))
             .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
             .windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
             .count()
             .toStream()
-            .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode,
JsonNode>>() {
-                @Override
-                public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key,
Long value) {
-                    ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
-                    keyNode.put("window-start", key.window().start())
-                            .put("region", key.key());
-
-                    ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
-                    valueNode.put("count", value);
-
-                    return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
-                }
+            .map((key, value) -> {
+                ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+                keyNode.put("window-start", key.window().start())
+                        .put("region", key.key());
+
+                ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+                valueNode.put("count", value);
+
+                return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
             });
 
         // write to the result topic
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index 5389877..d61e174 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -38,7 +38,7 @@ import java.util.concurrent.CountDownLatch;
  */
 public class PipeDemo {
 
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index c5eb5f9..4607d75 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -23,10 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;
@@ -71,7 +68,7 @@ public class TemperatureDemo {
     // window size within which the filtering is applied
     private static final int TEMPERATURE_WINDOW_SIZE = 5;
 
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
 
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature");
@@ -89,30 +86,17 @@ public class TemperatureDemo {
         KStream<Windowed<String>, String> max = source
             // temperature values are sent without a key (null), so in order
             // to group and reduce them, a key is needed ("temp" has been chosen)
-            .selectKey(new KeyValueMapper<String, String, String>() {
-                @Override
-                public String apply(String key, String value) {
-                    return "temp";
-                }
-            })
+            .selectKey((key, value) -> "temp")
             .groupByKey()
             .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
-            .reduce(new Reducer<String>() {
-                @Override
-                public String apply(String value1, String value2) {
-                    if (Integer.parseInt(value1) > Integer.parseInt(value2))
-                        return value1;
-                    else
-                        return value2;
-                }
+            .reduce((value1, value2) -> {
+                if (Integer.parseInt(value1) > Integer.parseInt(value2))
+                    return value1;
+                else
+                    return value2;
             })
             .toStream()
-            .filter(new Predicate<Windowed<String>, String>() {
-                @Override
-                public boolean test(Windowed<String> key, String value) {
-                    return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
-                }
-            });
+            .filter((key, value) -> Integer.parseInt(value) > TEMPERATURE_THRESHOLD);
 
         Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
 
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 7535315..4f0150e 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -23,9 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.ValueMapper;
 
 import java.util.Arrays;
 import java.util.Locale;
@@ -46,7 +44,7 @@ import java.util.concurrent.CountDownLatch;
  */
 public class WordCountDemo {
 
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -64,18 +62,8 @@ public class WordCountDemo {
         KStream<String, String> source = builder.stream("streams-plaintext-input");
 
         KTable<String, Long> counts = source
-            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                @Override
-                public Iterable<String> apply(String value) {
-                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
-                }
-            })
-            .groupBy(new KeyValueMapper<String, String, String>() {
-                @Override
-                public String apply(String key, String value) {
-                    return value;
-                }
-            })
+            .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("
")))
+            .groupBy((key, value) -> value)
             .count();
 
         // need to override value serde to Long type
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 523bb46..86feaeb 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -61,19 +60,16 @@ public class WordCountProcessorDemo {
                 @SuppressWarnings("unchecked")
                 public void init(final ProcessorContext context) {
                     this.context = context;
-                    this.context.schedule(1000, PunctuationType.STREAM_TIME, new Punctuator()
{
-                        @Override
-                        public void punctuate(long timestamp) {
-                            try (KeyValueIterator<String, Integer> iter = kvStore.all())
{
-                                System.out.println("----------- " + timestamp + " -----------
");
+                    this.context.schedule(1000, PunctuationType.STREAM_TIME, timestamp ->
{
+                        try (KeyValueIterator<String, Integer> iter = kvStore.all())
{
+                            System.out.println("----------- " + timestamp + " -----------
");
 
-                                while (iter.hasNext()) {
-                                    KeyValue<String, Integer> entry = iter.next();
+                            while (iter.hasNext()) {
+                                KeyValue<String, Integer> entry = iter.next();
 
-                                    System.out.println("[" + entry.key + ", " + entry.value
+ "]");
+                                System.out.println("[" + entry.key + ", " + entry.value +
"]");
 
-                                    context.forward(entry.key, entry.value.toString());
-                                }
+                                context.forward(entry.key, entry.value.toString());
                             }
                         }
                     });
@@ -103,7 +99,7 @@ public class WordCountProcessorDemo {
         }
     }
 
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -123,7 +119,7 @@ public class WordCountProcessorDemo {
                 Stores.inMemoryKeyValueStore("Counts"),
                 Serdes.String(),
                 Serdes.Integer()),
-                              "Process");
+                "Process");
 
         builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
 
diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
index ec40d2a..bbf54e6 100644
--- a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java
@@ -44,24 +44,10 @@ public class LineSplit {
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        builder.<String, String>stream("streams-plaintext-input")
-               .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                    @Override
-                    public Iterable<String> apply(String value) {
-                        return Arrays.asList(value.split("\\W+"));
-                    }
-                })
-               .to("streams-linesplit-output");
-
-        /* ------- use the code below for Java 8 and uncomment the above ----
-
         builder.stream("streams-plaintext-input")
                .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
                .to("streams-linesplit-output");
 
-           ----------------------------------------------------------------- */
-
-
         final Topology topology = builder.build();
         final KafkaStreams streams = new KafkaStreams(topology, props);
         final CountDownLatch latch = new CountDownLatch(1);
diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
index 020eb03..bdbefed 100644
--- a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -51,34 +51,12 @@ public class WordCount {
         final StreamsBuilder builder = new StreamsBuilder();
 
         builder.<String, String>stream("streams-plaintext-input")
-               .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                    @Override
-                    public Iterable<String> apply(String value) {
-                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
-                    }
-                })
-               .groupBy(new KeyValueMapper<String, String, String>() {
-                   @Override
-                   public String apply(String key, String value) {
-                       return value;
-                   }
-                })
-               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
-               .toStream()
-               .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
-
-
-        /* ------- use the code below for Java 8 and comment the above ----
-
-        builder.<String, String>stream("streams-plaintext-input")
                .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
 
-           ----------------------------------------------------------------- */
-
         final Topology topology = builder.build();
         final KafkaStreams streams = new KafkaStreams(topology, props);
         final CountDownLatch latch = new CountDownLatch(1);


Mime
View raw message