From commits-return-9847-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Thu Jun 21 17:03:04 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 192CC183E4 for ; Thu, 21 Jun 2018 17:03:04 +0000 (UTC) Received: (qmail 16077 invoked by uid 500); 21 Jun 2018 17:03:03 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 16035 invoked by uid 500); 21 Jun 2018 17:03:03 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 16026 invoked by uid 99); 21 Jun 2018 17:03:03 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jun 2018 17:03:03 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 26CAE8503A; Thu, 21 Jun 2018 17:03:03 +0000 (UTC) Date: Thu, 21 Jun 2018 17:03:02 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: MINOR: update web docs and examples of Streams with Java8 syntax (#5249) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152960058231.20810.8070796000453988848@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 456b17fc8e46db7a7d57e1aac3c2af961c93f57a X-Git-Newrev: d3e264e773c4652f34b40c7c3b494c0f7fbabffc X-Git-Rev: d3e264e773c4652f34b40c7c3b494c0f7fbabffc X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d3e264e MINOR: update web docs and examples of Streams with Java8 syntax (#5249) d3e264e is described below commit d3e264e773c4652f34b40c7c3b494c0f7fbabffc Author: Guozhang Wang AuthorDate: Thu Jun 21 10:02:58 2018 -0700 MINOR: update web docs and examples of Streams with Java8 syntax (#5249) Reviewers: John Roesler , Bill Bejeck , Damian Guy --- docs/streams/developer-guide/dsl-api.html | 10 +-- docs/streams/developer-guide/testing.html | 25 ++----- .../examples/pageview/PageViewTypedDemo.java | 84 +++++++++++----------- .../examples/pageview/PageViewUntypedDemo.java | 54 +++++--------- .../kafka/streams/examples/pipe/PipeDemo.java | 2 +- .../examples/temperature/TemperatureDemo.java | 32 +++------ .../streams/examples/wordcount/WordCountDemo.java | 18 +---- .../examples/wordcount/WordCountProcessorDemo.java | 22 +++--- .../src/main/java/LineSplit.java | 14 ---- .../src/main/java/WordCount.java | 22 ------ 10 files changed, 88 insertions(+), 195 deletions(-) diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index cd3a965..beb83a3 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -898,9 +898,9 @@ // Aggregating with time-based windowing (here: with 5-minute tumbling windows) KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(TimeUnit..aggregate( - () -> 0L, /* initializer */ - (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ - Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store")() -> 0L, /* initializer */ + (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ + Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store").withValueSerde(Serdes.Long())); /* serde for aggregate value */ // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes) @@ -908,8 +908,8 @@ aggregate( () -> 0L, /* initializer */ (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ - (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */ - Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store")(aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */ + Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store").withValueSerde(Serdes.Long())); /* serde for aggregate value */ // Java 7 examples diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index 92d8fce..bdecc43 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -255,18 +255,8 @@ public class CustomMaxAggregator implements Processor<String, Long> { @Override public void init(ProcessorContext context) { this.context = context; - context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() { - @Override - public void punctuate(long timestamp) { - flushStore(); - } - }); - context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() { - @Override - public void punctuate(long timestamp) { - flushStore(); - } - }); + context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, time -> flushStore()); + context.schedule(10000, PunctuationType.STREAM_TIME, time -> flushStore()); store = (KeyValueStore<String, Long>) context.getStateStore("aggStore"); } @@ -287,9 +277,6 @@ public class CustomMaxAggregator implements Processor<String, Long> { } @Override - public void punctuate(long timestamp) {} // deprecated; not used - - @Override public void close() {} } @@ -407,12 +394,8 @@ punctuator.punctuate(/*timestamp*/ 0L); diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index bd24e84..234d3fc 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -28,16 +28,14 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.Windowed; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** @@ -83,7 +81,7 @@ public class PageViewTypedDemo { public String region; } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); @@ -151,56 +149,56 @@ public class PageViewTypedDemo { Consumed.with(Serdes.String(), userProfileSerde)); KStream regionCount = views - .leftJoin(users, new ValueJoiner() { - @Override - public PageViewByRegion apply(PageView view, UserProfile profile) { - PageViewByRegion viewByRegion = new PageViewByRegion(); - viewByRegion.user = view.user; - viewByRegion.page = view.page; - - if (profile != null) { - viewByRegion.region = profile.region; - } else { - viewByRegion.region = "UNKNOWN"; - } - return viewByRegion; - } - }) - .map(new KeyValueMapper>() { - @Override - public KeyValue apply(String user, PageViewByRegion viewRegion) { - return new KeyValue<>(viewRegion.region, viewRegion); + .leftJoin(users, (view, profile) -> { + PageViewByRegion viewByRegion = new PageViewByRegion(); + viewByRegion.user = view.user; + viewByRegion.page = view.page; + + if (profile != null) { + viewByRegion.region = profile.region; + } else { + viewByRegion.region = "UNKNOWN"; } + return viewByRegion; }) + .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion)) .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde)) .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1))) .count() .toStream() - .map(new KeyValueMapper, Long, KeyValue>() { - @Override - public KeyValue apply(Windowed key, Long value) { - WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); - wViewByRegion.windowStart = key.window().start(); - wViewByRegion.region = key.key(); - - RegionCount rCount = new RegionCount(); - rCount.region = key.key(); - rCount.count = value; - - return new KeyValue<>(wViewByRegion, rCount); - } + .map((key, value) -> { + WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); + wViewByRegion.windowStart = key.window().start(); + wViewByRegion.region = key.key(); + + RegionCount rCount = new RegionCount(); + rCount.region = key.key(); + rCount.count = value; + + return new KeyValue<>(wViewByRegion, rCount); }); // write to the result topic regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde)); KafkaStreams streams = new KafkaStreams(builder.build(), props); - streams.start(); - - // usually the stream application would be running forever, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); + final CountDownLatch latch = new CountDownLatch(1); + + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread("streams-pipe-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); + + try { + streams.start(); + latch.await(); + } catch (Throwable e) { + System.exit(1); + } + System.exit(0); } } diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index c38d685..dddb542 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -33,13 +33,9 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.kstream.Windowed; import java.util.Properties; @@ -79,46 +75,30 @@ public class PageViewUntypedDemo { KTable users = builder.table("streams-userprofile-input", consumed); - KTable userRegions = users.mapValues(new ValueMapper() { - @Override - public String apply(JsonNode record) { - return record.get("region").textValue(); - } - }); + KTable userRegions = users.mapValues(record -> record.get("region").textValue()); KStream regionCount = views - .leftJoin(userRegions, new ValueJoiner() { - @Override - public JsonNode apply(JsonNode view, String region) { - ObjectNode jNode = JsonNodeFactory.instance.objectNode(); - - return jNode.put("user", view.get("user").textValue()) - .put("page", view.get("page").textValue()) - .put("region", region == null ? "UNKNOWN" : region); - } - }) - .map(new KeyValueMapper>() { - @Override - public KeyValue apply(String user, JsonNode viewRegion) { - return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); - } + .leftJoin(userRegions, (view, region) -> { + ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + return (JsonNode) jNode.put("user", view.get("user").textValue()) + .put("page", view.get("page").textValue()) + .put("region", region == null ? "UNKNOWN" : region); + }) + .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion)) .groupByKey(Serialized.with(Serdes.String(), jsonSerde)) .windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000)) .count() .toStream() - .map(new KeyValueMapper, Long, KeyValue>() { - @Override - public KeyValue apply(Windowed key, Long value) { - ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); - keyNode.put("window-start", key.window().start()) - .put("region", key.key()); - - ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); - valueNode.put("count", value); - - return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode); - } + .map((key, value) -> { + ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); + keyNode.put("window-start", key.window().start()) + .put("region", key.key()); + + ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); + valueNode.put("count", value); + + return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode); }); // write to the result topic diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java index 5389877..d61e174 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java @@ -38,7 +38,7 @@ import java.util.concurrent.CountDownLatch; */ public class PipeDemo { - public static void main(String[] args) throws Exception { + public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java index c5eb5f9..4607d75 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java @@ -23,10 +23,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.WindowedSerdes; @@ -71,7 +68,7 @@ public class TemperatureDemo { // window size within which the filtering is applied private static final int TEMPERATURE_WINDOW_SIZE = 5; - public static void main(String[] args) throws Exception { + public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature"); @@ -89,30 +86,17 @@ public class TemperatureDemo { KStream, String> max = source // temperature values are sent without a key (null), so in order // to group and reduce them, a key is needed ("temp" has been chosen) - .selectKey(new KeyValueMapper() { - @Override - public String apply(String key, String value) { - return "temp"; - } - }) + .selectKey((key, value) -> "temp") .groupByKey() .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE))) - .reduce(new Reducer() { - @Override - public String apply(String value1, String value2) { - if (Integer.parseInt(value1) > Integer.parseInt(value2)) - return value1; - else - return value2; - } + .reduce((value1, value2) -> { + if (Integer.parseInt(value1) > Integer.parseInt(value2)) + return value1; + else + return value2; }) .toStream() - .filter(new Predicate, String>() { - @Override - public boolean test(Windowed key, String value) { - return Integer.parseInt(value) > TEMPERATURE_THRESHOLD; - } - }); + .filter((key, value) -> Integer.parseInt(value) > TEMPERATURE_THRESHOLD); Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index 7535315..4f0150e 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -23,9 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.ValueMapper; import java.util.Arrays; import java.util.Locale; @@ -46,7 +44,7 @@ import java.util.concurrent.CountDownLatch; */ public class WordCountDemo { - public static void main(String[] args) throws Exception { + public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); @@ -64,18 +62,8 @@ public class WordCountDemo { KStream source = builder.stream("streams-plaintext-input"); KTable counts = source - .flatMapValues(new ValueMapper>() { - @Override - public Iterable apply(String value) { - return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); - } - }) - .groupBy(new KeyValueMapper() { - @Override - public String apply(String key, String value) { - return value; - } - }) + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) + .groupBy((key, value) -> value) .count(); // need to override value serde to Long type diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 523bb46..86feaeb 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -26,7 +26,6 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; @@ -61,19 +60,16 @@ public class WordCountProcessorDemo { @SuppressWarnings("unchecked") public void init(final ProcessorContext context) { this.context = context; - this.context.schedule(1000, PunctuationType.STREAM_TIME, new Punctuator() { - @Override - public void punctuate(long timestamp) { - try (KeyValueIterator iter = kvStore.all()) { - System.out.println("----------- " + timestamp + " ----------- "); + this.context.schedule(1000, PunctuationType.STREAM_TIME, timestamp -> { + try (KeyValueIterator iter = kvStore.all()) { + System.out.println("----------- " + timestamp + " ----------- "); - while (iter.hasNext()) { - KeyValue entry = iter.next(); + while (iter.hasNext()) { + KeyValue entry = iter.next(); - System.out.println("[" + entry.key + ", " + entry.value + "]"); + System.out.println("[" + entry.key + ", " + entry.value + "]"); - context.forward(entry.key, entry.value.toString()); - } + context.forward(entry.key, entry.value.toString()); } } }); @@ -103,7 +99,7 @@ public class WordCountProcessorDemo { } } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); @@ -123,7 +119,7 @@ public class WordCountProcessorDemo { Stores.inMemoryKeyValueStore("Counts"), Serdes.String(), Serdes.Integer()), - "Process"); + "Process"); builder.addSink("Sink", "streams-wordcount-processor-output", "Process"); diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java index ec40d2a..bbf54e6 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java +++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/LineSplit.java @@ -44,24 +44,10 @@ public class LineSplit { final StreamsBuilder builder = new StreamsBuilder(); - builder.stream("streams-plaintext-input") - .flatMapValues(new ValueMapper>() { - @Override - public Iterable apply(String value) { - return Arrays.asList(value.split("\\W+")); - } - }) - .to("streams-linesplit-output"); - - /* ------- use the code below for Java 8 and uncomment the above ---- - builder.stream("streams-plaintext-input") .flatMapValues(value -> Arrays.asList(value.split("\\W+"))) .to("streams-linesplit-output"); - ----------------------------------------------------------------- */ - - final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java index 020eb03..bdbefed 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java +++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java @@ -51,34 +51,12 @@ public class WordCount { final StreamsBuilder builder = new StreamsBuilder(); builder.stream("streams-plaintext-input") - .flatMapValues(new ValueMapper>() { - @Override - public Iterable apply(String value) { - return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); - } - }) - .groupBy(new KeyValueMapper() { - @Override - public String apply(String key, String value) { - return value; - } - }) - .count(Materialized.>as("counts-store")) - .toStream() - .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); - - - /* ------- use the code below for Java 8 and comment the above ---- - - builder.stream("streams-plaintext-input") .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); - ----------------------------------------------------------------- */ - final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1);