KAFKA-3192: Add unwindowed aggregations for KStream; and make all example code executable Author: Guozhang Wang Reviewers: Yasuhiro Matsuda, Michael G. Noll, Jun Rao Closes #870 from guozhangwang/K3192 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/845c6eae Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/845c6eae Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/845c6eae Branch: refs/heads/trunk Commit: 845c6eae1f6c6bcf117f5baa53bb19b4611c0528 Parents: a731297 Author: Guozhang Wang Authored: Mon Feb 29 14:03:32 2016 -0800 Committer: Guozhang Wang Committed: Mon Feb 29 14:03:32 2016 -0800 ---------------------------------------------------------------------- .../clients/producer/internals/Sender.java | 1 + .../scala/kafka/tools/ConsoleConsumer.scala | 23 +- .../examples/pageview/JsonPOJODeserializer.java | 5 - .../pageview/JsonTimestampExtractor.java | 46 +++ .../examples/pageview/PageViewTypedJob.java | 88 +++++- .../examples/pageview/PageViewUnTypedJob.java | 87 ++++-- .../kafka/streams/examples/pipe/PipeJob.java | 18 +- .../examples/wordcount/WordCountJob.java | 48 ++- .../wordcount/WordCountProcessorJob.java | 23 +- .../apache/kafka/streams/kstream/KStream.java | 39 +++ .../internals/KStreamAggProcessorSupplier.java | 28 ++ .../kstream/internals/KStreamAggWindow.java | 51 ---- .../kstream/internals/KStreamAggregate.java | 119 ++------ .../streams/kstream/internals/KStreamImpl.java | 100 +++++-- .../kstream/internals/KStreamReduce.java | 124 +++----- .../internals/KStreamWindowAggregate.java | 171 +++++++++++ .../kstream/internals/KStreamWindowReduce.java | 165 +++++++++++ .../streams/kstream/internals/KTableImpl.java | 10 +- .../internals/KTableProcessorSupplier.java | 1 - .../streams/processor/TopologyBuilder.java | 26 ++ .../processor/internals/RecordQueue.java | 8 + .../internals/StreamPartitionAssignor.java | 2 +- .../processor/internals/StreamThread.java | 1 - .../kstream/internals/KStreamAggregateTest.java | 294 ------------------- .../internals/KStreamWindowAggregateTest.java | 294 +++++++++++++++++++ .../kstream/internals/KTableAggregateTest.java | 1 + .../processor/internals/PartitionGroupTest.java | 4 +- .../processor/internals/RecordQueueTest.java | 4 +- .../processor/internals/StreamTaskTest.java | 19 +- 29 files changed, 1168 insertions(+), 632 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8e93973..9d24d07 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -343,6 +343,7 @@ public class Sender implements Runnable { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; + return new ClientRequest(now, acks != 0, send, callback); } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 0ae057f..0d85aca 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -29,6 +29,7 @@ import kafka.utils._ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.serialization.{Deserializer, ByteArrayDeserializer} import org.apache.kafka.common.utils.Utils import org.apache.log4j.Logger @@ -349,7 +350,12 @@ class DefaultMessageFormatter extends MessageFormatter { var keySeparator = "\t".getBytes var lineSeparator = "\n".getBytes + var keyDecoder : Deserializer[_ <: Object] = new ByteArrayDeserializer() + var valDecoder : Deserializer[_ <: Object] = new ByteArrayDeserializer() + override def init(props: Properties) { + System.out.println(props) + if (props.containsKey("print.timestamp")) printTimestamp = props.getProperty("print.timestamp").trim.toLowerCase.equals("true") if (props.containsKey("print.key")) @@ -358,6 +364,19 @@ class DefaultMessageFormatter extends MessageFormatter { keySeparator = props.getProperty("key.separator").getBytes if (props.containsKey("line.separator")) lineSeparator = props.getProperty("line.separator").getBytes + + if (props.containsKey("key.decoder")) { + keyDecoder = Class.forName(props.getProperty("key.decoder")).newInstance().asInstanceOf[Deserializer[_ <: Object]] + + System.out.println("update key decoder") + } + if (props.containsKey("value.decoder")) { + valDecoder = Class.forName(props.getProperty("value.decoder")).newInstance().asInstanceOf[Deserializer[_ <: Object]] + + System.out.println("update value decoder") + } + System.out.println(keyDecoder) + System.out.println(valDecoder) } def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) { @@ -369,10 +388,10 @@ class DefaultMessageFormatter extends MessageFormatter { output.write(keySeparator) } if (printKey) { - output.write(if (key == null) "null".getBytes else key) + output.write(if (key == null) "null".getBytes else keyDecoder.deserialize(null, key).toString.getBytes) output.write(keySeparator) } - output.write(if (value == null) "null".getBytes else value) + output.write(if (value == null) "null".getBytes else valDecoder.deserialize(null, value).toString.getBytes) output.write(lineSeparator) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java index 583ec2d..5fcd1f3 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java @@ -16,17 +16,12 @@ **/ package org.apache.kafka.streams.examples.pageview; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; -/** - * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily - * structured data without having associated Java classes. This deserializer also supports Connect schemas. - */ public class JsonPOJODeserializer implements Deserializer { private ObjectMapper objectMapper = new ObjectMapper(); http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java new file mode 100644 index 0000000..6443193 --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pageview; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.TimestampExtractor; + +/** + * A timestamp extractor implementation that tries to extract event time from + * the "timestamp" field in the Json formatted message. + */ +public class JsonTimestampExtractor implements TimestampExtractor { + + @Override + public long extract(ConsumerRecord record) { + if (record.value() instanceof PageViewTypedJob.PageView) { + return ((PageViewTypedJob.PageView) record.value()).timestamp; + } + + if (record.value() instanceof PageViewTypedJob.UserProfile) { + return ((PageViewTypedJob.UserProfile) record.value()).timestamp; + } + + if (record.value() instanceof JsonNode) { + return ((JsonNode) record.value()).get("timestamp").longValue(); + } + + throw new IllegalArgumentException("JsonTimestampExtractor cannot recognize the record value " + record.value()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java index 358cbe8..3f9b283 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.examples.pageview; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; @@ -30,22 +31,38 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.StreamsConfig; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +/** + * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, + * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes + * in Kafka Streams. + * + * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" + * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format + * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ public class PageViewTypedJob { // POJO classes static public class PageView { public String user; public String page; + public Long timestamp; } static public class UserProfile { - public String user; public String region; + public Long timestamp; } static public class PageViewByRegion { @@ -66,13 +83,17 @@ public class PageViewTypedJob { public static void main(String[] args) throws Exception { Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview"); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class); props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); @@ -81,26 +102,59 @@ public class PageViewTypedJob { final Serializer longSerializer = new LongSerializer(); final Deserializer longDeserializer = new LongDeserializer(); + // TODO: the following can be removed with a serialization factory + Map serdeProps = new HashMap<>(); + + final Deserializer pageViewDeserializer = new JsonPOJODeserializer<>(); + serdeProps.put("JsonPOJOClass", PageView.class); + pageViewDeserializer.configure(serdeProps, false); + + final Deserializer userProfileDeserializer = new JsonPOJODeserializer<>(); + serdeProps.put("JsonPOJOClass", UserProfile.class); + userProfileDeserializer.configure(serdeProps, false); - KStream views = builder.stream("streams-pageview-input"); + final Serializer userProfileSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", UserProfile.class); + userProfileSerializer.configure(serdeProps, false); - KStream viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.user, record)); + final Serializer wPageViewByRegionSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); + wPageViewByRegionSerializer.configure(serdeProps, false); - KTable users = builder.table("streams-userprofile-input"); + final Serializer regionCountSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", RegionCount.class); + regionCountSerializer.configure(serdeProps, false); - KStream regionCount = viewsByUser - .leftJoin(users, (view, profile) -> { - PageViewByRegion viewByRegion = new PageViewByRegion(); - viewByRegion.user = view.user; - viewByRegion.page = view.page; - viewByRegion.region = profile.region; + KStream views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input"); - return viewByRegion; + KTable users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input"); + + KStream regionCount = views + .leftJoin(users, new ValueJoiner() { + @Override + public PageViewByRegion apply(PageView view, UserProfile profile) { + PageViewByRegion viewByRegion = new PageViewByRegion(); + viewByRegion.user = view.user; + viewByRegion.page = view.page; + + if (profile != null) { + viewByRegion.region = profile.region; + } else { + viewByRegion.region = "UNKNOWN"; + } + return viewByRegion; + } + }) + .map(new KeyValueMapper>() { + @Override + public KeyValue apply(String user, PageViewByRegion viewRegion) { + return new KeyValue<>(viewRegion.region, viewRegion); + } }) - .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion)) .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), stringSerializer, longSerializer, stringDeserializer, longDeserializer) + // TODO: we can merge ths toStream().map(...) with a single toStream(...) .toStream() .map(new KeyValueMapper, Long, KeyValue>() { @Override @@ -118,9 +172,15 @@ public class PageViewTypedJob { }); // write to the result topic - regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>()); + regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); + + // usually the streaming job would be ever running, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java index 2fdfa97..065f5f5 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.examples.pageview; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; @@ -31,27 +32,44 @@ import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.Windowed; import java.util.Properties; -public class PageViewUnTypedJob { +/** + * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, + * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes + * in Kafka Streams. + * + * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" + * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format + * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class PageViewUntypedJob { public static void main(String[] args) throws Exception { Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview"); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); @@ -59,47 +77,66 @@ public class PageViewUnTypedJob { final Deserializer stringDeserializer = new StringDeserializer(); final Serializer longSerializer = new LongSerializer(); final Deserializer longDeserializer = new LongDeserializer(); - - - KStream views = builder.stream("streams-pageview-input"); - - KStream viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.get("user").textValue(), record)); - - KTable users = builder.table("streams-userprofile-input"); - - KTable userRegions = users.mapValues(record -> record.get("region").textValue()); - - KStream regionCount = viewsByUser - .leftJoin(userRegions, (view, region) -> { - ObjectNode jNode = JsonNodeFactory.instance.objectNode(); - - return (JsonNode) jNode.put("user", view.get("user").textValue()) - .put("page", view.get("page").textValue()) - .put("region", region); + final Serializer jsonSerializer = new JsonSerializer(); + final Deserializer jsonDeserializer = new JsonDeserializer(); + + KStream views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input"); + + KTable users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input"); + + KTable userRegions = users.mapValues(new ValueMapper() { + @Override + public String apply(JsonNode record) { + return record.get("region").textValue(); + } + }); + + KStream regionCount = views + .leftJoin(userRegions, new ValueJoiner() { + @Override + public JsonNode apply(JsonNode view, String region) { + ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + + return jNode.put("user", view.get("user").textValue()) + .put("page", view.get("page").textValue()) + .put("region", region == null ? "UNKNOWN" : region); + } + }) + .map(new KeyValueMapper>() { + @Override + public KeyValue apply(String user, JsonNode viewRegion) { + return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); + } }) - .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion)) .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), stringSerializer, longSerializer, stringDeserializer, longDeserializer) + // TODO: we can merge ths toStream().map(...) with a single toStream(...) .toStream() .map(new KeyValueMapper, Long, KeyValue>() { @Override public KeyValue apply(Windowed key, Long value) { ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); keyNode.put("window-start", key.window().start()) - .put("region", key.window().start()); + .put("region", key.value()); ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); - keyNode.put("count", value); + valueNode.put("count", value); - return new KeyValue((JsonNode) keyNode, (JsonNode) valueNode); + return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode); } }); // write to the result topic - regionCount.to("streams-pageviewstats-output"); + regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); + + // usually the streaming job would be ever running, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java index 841f37b..9e737ba 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java @@ -26,6 +26,16 @@ import org.apache.kafka.streams.StreamsConfig; import java.util.Properties; +/** + * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to + * write data to a sink (output) topic. + * + * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input" + * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output". + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ public class PipeJob { public static void main(String[] args) throws Exception { @@ -37,7 +47,7 @@ public class PipeJob { props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - // can specify underlying client configs if necessary + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); @@ -46,5 +56,11 @@ public class PipeJob { KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); + + // usually the streaming job would be ever running, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java index b922695..da6b095 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java @@ -17,9 +17,6 @@ package org.apache.kafka.streams.examples.wordcount; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; @@ -27,21 +24,29 @@ import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.connect.json.JsonSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.UnlimitedWindows; import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.kstream.Windowed; import java.util.Arrays; import java.util.Properties; +/** + * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program + * that computes a simple word occurrence histogram from an input text. + * + * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages + * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record + * is an updated count of a single word. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ public class WordCountJob { public static void main(String[] args) throws Exception { @@ -54,7 +59,7 @@ public class WordCountJob { props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - // can specify underlying client configs if necessary + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); @@ -63,11 +68,10 @@ public class WordCountJob { final Deserializer stringDeserializer = new StringDeserializer(); final Serializer longSerializer = new LongSerializer(); final Deserializer longDeserializer = new LongDeserializer(); - final Serializer JsonSerializer = new JsonSerializer(); KStream source = builder.stream("streams-file-input"); - KStream counts = source + KTable counts = source .flatMapValues(new ValueMapper>() { @Override public Iterable apply(String value) { @@ -79,25 +83,17 @@ public class WordCountJob { return new KeyValue(value, value); } }) - .countByKey(UnlimitedWindows.of("Counts").startOn(0L), - stringSerializer, longSerializer, - stringDeserializer, longDeserializer) - .toStream() - .map(new KeyValueMapper, Long, KeyValue>() { - @Override - public KeyValue apply(Windowed key, Long value) { - ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts"); - jNode.put("word", key.value()) - .put("count", value); - - return new KeyValue(null, jNode); - } - }); - - counts.to("streams-wordcount-output", stringSerializer, JsonSerializer); + counts.to("streams-wordcount-output", stringSerializer, longSerializer); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); + + // usually the streaming job would be ever running, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java index 63692bd..61e8335 100644 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java @@ -33,6 +33,17 @@ import org.apache.kafka.streams.state.Stores; import java.util.Properties; +/** + * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program + * that computes a simple word occurrence histogram from an input text. + * + * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages + * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record + * is an updated count of a single word. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ public class WordCountProcessorJob { private static class MyProcessorSupplier implements ProcessorSupplier { @@ -72,7 +83,7 @@ public class WordCountProcessorJob { public void punctuate(long timestamp) { KeyValueIterator iter = this.kvStore.all(); - System.out.println("----------- " + timestamp + "----------- "); + System.out.println("----------- " + timestamp + " ----------- "); while (iter.hasNext()) { KeyValue entry = iter.next(); @@ -103,7 +114,7 @@ public class WordCountProcessorJob { props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - // can specify underlying client configs if necessary + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); TopologyBuilder builder = new TopologyBuilder(); @@ -113,9 +124,15 @@ public class WordCountProcessorJob { builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); - builder.addSink("Sink", "streams-wordcount-output", "Process"); + builder.addSink("Sink", "streams-wordcount-processor-output", "Process"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); + + // usually the streaming job would be ever running, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index b83298f..231eb22 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -283,6 +283,18 @@ public interface KStream { /** * Aggregate values of this stream by key on a window basis. * + * @param reducer the class of Reducer + */ + KTable reduceByKey(Reducer reducer, + Serializer keySerializer, + Serializer aggValueSerializer, + Deserializer keyDeserializer, + Deserializer aggValueDeserializer, + String name); + + /** + * Aggregate values of this stream by key on a window basis. + * * @param initializer the class of Initializer * @param aggregator the class of Aggregator * @param windows the specification of the aggregation window @@ -297,6 +309,22 @@ public interface KStream { Deserializer aggValueDeserializer); /** + * Aggregate values of this stream by key without a window basis, and hence + * return an ever updating table + * + * @param initializer the class of Initializer + * @param aggregator the class of Aggregator + * @param the value type of the aggregated table + */ + KTable aggregateByKey(Initializer initializer, + Aggregator aggregator, + Serializer keySerializer, + Serializer aggValueSerializer, + Deserializer keyDeserializer, + Deserializer aggValueDeserializer, + String name); + + /** * Count number of messages of this stream by key on a window basis. * * @param windows the specification of the aggregation window @@ -306,4 +334,15 @@ public interface KStream { Serializer aggValueSerializer, Deserializer keyDeserializer, Deserializer aggValueDeserializer); + + /** + * Count number of messages of this stream by key without a window basis, and hence + * return a ever updating counting table + * + */ + KTable countByKey(Serializer keySerializer, + Serializer aggValueSerializer, + Deserializer keyDeserializer, + Deserializer aggValueDeserializer, + String name); } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java new file mode 100644 index 0000000..deb98ed --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.ProcessorSupplier; + +public interface KStreamAggProcessorSupplier extends ProcessorSupplier { + + KTableValueGetterSupplier view(); + + void enableSendingOldValues(); +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java deleted file mode 100644 index f02f53a..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -public class KStreamAggWindow implements ProcessorSupplier { - - @Override - public Processor get() { - return new KStreamAggWindowProcessor(); - } - - private class KStreamAggWindowProcessor extends AbstractProcessor { - - @SuppressWarnings("unchecked") - @Override - public void init(ProcessorContext context) { - super.init(context); - } - - @Override - public void process(K key, V value) { - // create a dummy window just for wrapping the timestamp - long timestamp = context().timestamp(); - - // send the new aggregate value - context().forward(new Windowed<>(key, new UnlimitedWindow(timestamp)), new Change<>(value, null)); - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index b64277c..f41bfa6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -18,38 +18,28 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Initializer; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.KeyValueStore; -import java.util.Iterator; -import java.util.Map; - -public class KStreamAggregate implements KTableProcessorSupplier, V, T> { +public class KStreamAggregate implements KStreamAggProcessorSupplier { private final String storeName; - private final Windows windows; private final Initializer initializer; private final Aggregator aggregator; private boolean sendOldValues = false; - public KStreamAggregate(Windows windows, String storeName, Initializer initializer, Aggregator aggregator) { - this.windows = windows; + public KStreamAggregate(String storeName, Initializer initializer, Aggregator aggregator) { this.storeName = storeName; this.initializer = initializer; this.aggregator = aggregator; } @Override - public Processor, Change> get() { + public Processor get() { return new KStreamAggregateProcessor(); } @@ -58,117 +48,68 @@ public class KStreamAggregate implements KTableProces sendOldValues = true; } - private class KStreamAggregateProcessor extends AbstractProcessor, Change> { + private class KStreamAggregateProcessor extends AbstractProcessor { - private WindowStore windowStore; + private KeyValueStore store; @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { super.init(context); - windowStore = (WindowStore) context.getStateStore(storeName); + store = (KeyValueStore) context.getStateStore(storeName); } @Override - public void process(Windowed windowedKey, Change change) { - // first get the matching windows - long timestamp = windowedKey.window().start(); - K key = windowedKey.value(); - V value = change.newValue; - - Map matchedWindows = windows.windowsFor(timestamp); - - long timeFrom = Long.MAX_VALUE; - long timeTo = Long.MIN_VALUE; - - // use range query on window store for efficient reads - for (long windowStartMs : matchedWindows.keySet()) { - timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom; - timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; - } - - WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo); - - // for each matching window, try to update the corresponding key and send to the downstream - while (iter.hasNext()) { - KeyValue entry = iter.next(); - W window = matchedWindows.get(entry.key); - - if (window != null) { - - T oldAgg = entry.value; + public void process(K key, V value) { + T oldAgg = store.get(key); - if (oldAgg == null) - oldAgg = initializer.apply(); + if (oldAgg == null) + oldAgg = initializer.apply(); - // try to add the new new value (there will never be old value) - T newAgg = aggregator.apply(key, value, oldAgg); + T newAgg = oldAgg; - // update the store with the new value - windowStore.put(key, newAgg, window.start()); - - // forward the aggregated change pair - if (sendOldValues) - context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); - - matchedWindows.remove(entry.key); - } + // try to add the new new value + if (value != null) { + newAgg = aggregator.apply(key, value, newAgg); } - iter.close(); - - // create the new window for the rest of unmatched window that do not exist yet - for (long windowStartMs : matchedWindows.keySet()) { - T oldAgg = initializer.apply(); - T newAgg = aggregator.apply(key, value, oldAgg); + // update the store with the new value + store.put(key, newAgg); - windowStore.put(key, newAgg, windowStartMs); - - // send the new aggregate pair - if (sendOldValues) - context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null)); - } + // send the old / new pair + if (sendOldValues) + context().forward(key, new Change<>(newAgg, oldAgg)); + else + context().forward(key, new Change<>(newAgg, null)); } } @Override - public KTableValueGetterSupplier, T> view() { + public KTableValueGetterSupplier view() { - return new KTableValueGetterSupplier, T>() { + return new KTableValueGetterSupplier() { - public KTableValueGetter, T> get() { + public KTableValueGetter get() { return new KStreamAggregateValueGetter(); } }; } - private class KStreamAggregateValueGetter implements KTableValueGetter, T> { + private class KStreamAggregateValueGetter implements KTableValueGetter { - private WindowStore windowStore; + private KeyValueStore store; @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { - windowStore = (WindowStore) context.getStateStore(storeName); + store = (KeyValueStore) context.getStateStore(storeName); } - @SuppressWarnings("unchecked") @Override - public T get(Windowed windowedKey) { - K key = windowedKey.value(); - W window = (W) windowedKey.window(); - - // this iterator should contain at most one element - Iterator> iter = windowStore.fetch(key, window.start(), window.start()); - - return iter.hasNext() ? iter.next().value : null; + public T get(K key) { + return store.get(key); } - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 79a3115..9f384ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -385,6 +385,7 @@ public class KStreamImpl extends AbstractStream implements KStream((KTableImpl) other, joiner), this.name); + topology.connectProcessors(this.name, ((KTableImpl) other).name); return new KStreamImpl<>(topology, name, allSourceNodes); } @@ -397,15 +398,11 @@ public class KStreamImpl extends AbstractStream implements KStream keyDeserializer, Deserializer aggValueDeserializer) { - // TODO: this agg window operator is only used for casting K to Windowed for - // KTableProcessorSupplier, which is a bit awkward and better be removed in the future String reduceName = topology.newName(REDUCE_NAME); - String selectName = topology.newName(SELECT_NAME); - ProcessorSupplier aggWindowSupplier = new KStreamAggWindow<>(); - ProcessorSupplier, Change> aggregateSupplier = new KStreamReduce<>(windows, windows.name(), reducer); + KStreamWindowReduce reduceSupplier = new KStreamWindowReduce<>(windows, windows.name(), reducer); - StateStoreSupplier aggregateStore = Stores.create(windows.name()) + StateStoreSupplier reduceStore = Stores.create(windows.name()) .withKeys(keySerializer, keyDeserializer) .withValues(aggValueSerializer, aggValueDeserializer) .persistent() @@ -413,12 +410,37 @@ public class KStreamImpl extends AbstractStream implements KStream(topology, reduceName, reduceSupplier, sourceNodes); + } + + @Override + public KTable reduceByKey(Reducer reducer, + Serializer keySerializer, + Serializer aggValueSerializer, + Deserializer keyDeserializer, + Deserializer aggValueDeserializer, + String name) { + + String reduceName = topology.newName(REDUCE_NAME); + + KStreamReduce reduceSupplier = new KStreamReduce<>(name, reducer); + + StateStoreSupplier reduceStore = Stores.create(name) + .withKeys(keySerializer, keyDeserializer) + .withValues(aggValueSerializer, aggValueDeserializer) + .persistent() + .build(); + + // aggregate the values with the aggregator and local store + topology.addProcessor(reduceName, reduceSupplier, this.name); + topology.addStateStore(reduceStore, reduceName); // return the KTable representation with the intermediate topic as the sources - return new KTableImpl<>(topology, reduceName, aggregateSupplier, sourceNodes); + return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes); } @Override @@ -430,13 +452,9 @@ public class KStreamImpl extends AbstractStream implements KStream keyDeserializer, Deserializer aggValueDeserializer) { - // TODO: this agg window operator is only used for casting K to Windowed for - // KTableProcessorSupplier, which is a bit awkward and better be removed in the future String aggregateName = topology.newName(AGGREGATE_NAME); - String selectName = topology.newName(SELECT_NAME); - ProcessorSupplier aggWindowSupplier = new KStreamAggWindow<>(); - ProcessorSupplier, Change> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), initializer, aggregator); + KStreamAggProcessorSupplier, V, T> aggregateSupplier = new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator); StateStoreSupplier aggregateStore = Stores.create(windows.name()) .withKeys(keySerializer, keyDeserializer) @@ -446,8 +464,34 @@ public class KStreamImpl extends AbstractStream implements KStream, T, T>(topology, aggregateName, aggregateSupplier, sourceNodes); + } + + @Override + public KTable aggregateByKey(Initializer initializer, + Aggregator aggregator, + Serializer keySerializer, + Serializer aggValueSerializer, + Deserializer keyDeserializer, + Deserializer aggValueDeserializer, + String name) { + + String aggregateName = topology.newName(AGGREGATE_NAME); + + KStreamAggProcessorSupplier aggregateSupplier = new KStreamAggregate<>(name, initializer, aggregator); + + StateStoreSupplier aggregateStore = Stores.create(name) + .withKeys(keySerializer, keyDeserializer) + .withValues(aggValueSerializer, aggValueDeserializer) + .persistent() + .build(); + + // aggregate the values with the aggregator and local store + topology.addProcessor(aggregateName, aggregateSupplier, this.name); topology.addStateStore(aggregateStore, aggregateName); // return the KTable representation with the intermediate topic as the sources @@ -474,4 +518,26 @@ public class KStreamImpl extends AbstractStream implements KStream countByKey(Serializer keySerializer, + Serializer aggValueSerializer, + Deserializer keyDeserializer, + Deserializer aggValueDeserializer, + String name) { + + return this.aggregateByKey( + new Initializer() { + @Override + public Long apply() { + return 0L; + } + }, + new Aggregator() { + @Override + public Long apply(K aggKey, V value, Long aggregate) { + return aggregate + 1L; + } + }, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer, name); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index c484c7b..0ec0465 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -17,37 +17,27 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Reducer; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.KeyValueStore; -import java.util.Iterator; -import java.util.Map; - -public class KStreamReduce implements KTableProcessorSupplier, V, V> { +public class KStreamReduce implements KStreamAggProcessorSupplier { private final String storeName; - private final Windows windows; private final Reducer reducer; private boolean sendOldValues = false; - public KStreamReduce(Windows windows, String storeName, Reducer reducer) { - this.windows = windows; + public KStreamReduce(String storeName, Reducer reducer) { this.storeName = storeName; this.reducer = reducer; } @Override - public Processor, Change> get() { - return new KStreamAggregateProcessor(); + public Processor get() { + return new KStreamReduceProcessor(); } @Override @@ -55,113 +45,69 @@ public class KStreamReduce implements KTableProcessorSup sendOldValues = true; } - private class KStreamAggregateProcessor extends AbstractProcessor, Change> { + private class KStreamReduceProcessor extends AbstractProcessor { - private WindowStore windowStore; + private KeyValueStore store; @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { super.init(context); - windowStore = (WindowStore) context.getStateStore(storeName); + store = (KeyValueStore) context.getStateStore(storeName); } @Override - public void process(Windowed windowedKey, Change change) { - // first get the matching windows - long timestamp = windowedKey.window().start(); - K key = windowedKey.value(); - V value = change.newValue; - - Map matchedWindows = windows.windowsFor(timestamp); - - long timeFrom = Long.MAX_VALUE; - long timeTo = Long.MIN_VALUE; - - // use range query on window store for efficient reads - for (long windowStartMs : matchedWindows.keySet()) { - timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom; - timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; - } - - WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo); - - // for each matching window, try to update the corresponding key and send to the downstream - while (iter.hasNext()) { - KeyValue entry = iter.next(); - W window = matchedWindows.get(entry.key); - - if (window != null) { - - V oldAgg = entry.value; - V newAgg = oldAgg; - - // try to add the new new value (there will never be old value) - if (newAgg == null) { - newAgg = value; - } else { - newAgg = reducer.apply(newAgg, value); - } - - // update the store with the new value - windowStore.put(key, newAgg, window.start()); - - // forward the aggregated change pair - if (sendOldValues) - context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); - - matchedWindows.remove(entry.key); + public void process(K key, V value) { + V oldAgg = store.get(key); + V newAgg = oldAgg; + + // try to add the new new value + if (value != null) { + if (newAgg == null) { + newAgg = value; + } else { + newAgg = reducer.apply(newAgg, value); } } - iter.close(); + // update the store with the new value + store.put(key, newAgg); - // create the new window for the rest of unmatched window that do not exist yet - for (long windowStartMs : matchedWindows.keySet()) { - windowStore.put(key, value, windowStartMs); - - // send the new aggregate pair (there will be no old value) - context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null)); - } + // send the old / new pair + if (sendOldValues) + context().forward(key, new Change<>(newAgg, oldAgg)); + else + context().forward(key, new Change<>(newAgg, null)); } } @Override - public KTableValueGetterSupplier, V> view() { + public KTableValueGetterSupplier view() { - return new KTableValueGetterSupplier, V>() { + return new KTableValueGetterSupplier() { - public KTableValueGetter, V> get() { - return new KStreamAggregateValueGetter(); + public KTableValueGetter get() { + return new KStreamReduceValueGetter(); } }; } - private class KStreamAggregateValueGetter implements KTableValueGetter, V> { + private class KStreamReduceValueGetter implements KTableValueGetter { - private WindowStore windowStore; + private KeyValueStore store; @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { - windowStore = (WindowStore) context.getStateStore(storeName); + store = (KeyValueStore) context.getStateStore(storeName); } - @SuppressWarnings("unchecked") @Override - public V get(Windowed windowedKey) { - K key = windowedKey.value(); - W window = (W) windowedKey.window(); - - // this iterator should only contain one element - Iterator> iter = windowStore.fetch(key, window.start(), window.start()); - - return iter.next().value; + public V get(K key) { + return store.get(key); } - } } + http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java new file mode 100644 index 0000000..76964f9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.Windows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; + +import java.util.Iterator; +import java.util.Map; + +public class KStreamWindowAggregate implements KStreamAggProcessorSupplier, V, T> { + + private final String storeName; + private final Windows windows; + private final Initializer initializer; + private final Aggregator aggregator; + + private boolean sendOldValues = false; + + public KStreamWindowAggregate(Windows windows, String storeName, Initializer initializer, Aggregator aggregator) { + this.windows = windows; + this.storeName = storeName; + this.initializer = initializer; + this.aggregator = aggregator; + } + + @Override + public Processor get() { + return new KStreamWindowAggregateProcessor(); + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + private class KStreamWindowAggregateProcessor extends AbstractProcessor { + + private WindowStore windowStore; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + super.init(context); + + windowStore = (WindowStore) context.getStateStore(storeName); + } + + @Override + public void process(K key, V value) { + // first get the matching windows + long timestamp = context().timestamp(); + Map matchedWindows = windows.windowsFor(timestamp); + + long timeFrom = Long.MAX_VALUE; + long timeTo = Long.MIN_VALUE; + + // use range query on window store for efficient reads + for (long windowStartMs : matchedWindows.keySet()) { + timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom; + timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; + } + + WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo); + + // for each matching window, try to update the corresponding key and send to the downstream + while (iter.hasNext()) { + KeyValue entry = iter.next(); + W window = matchedWindows.get(entry.key); + + if (window != null) { + + T oldAgg = entry.value; + + if (oldAgg == null) + oldAgg = initializer.apply(); + + // try to add the new new value (there will never be old value) + T newAgg = aggregator.apply(key, value, oldAgg); + + // update the store with the new value + windowStore.put(key, newAgg, window.start()); + + // forward the aggregated change pair + if (sendOldValues) + context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); + + matchedWindows.remove(entry.key); + } + } + + iter.close(); + + // create the new window for the rest of unmatched window that do not exist yet + for (long windowStartMs : matchedWindows.keySet()) { + T oldAgg = initializer.apply(); + T newAgg = aggregator.apply(key, value, oldAgg); + + windowStore.put(key, newAgg, windowStartMs); + + // send the new aggregate pair + if (sendOldValues) + context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null)); + } + } + } + + @Override + public KTableValueGetterSupplier, T> view() { + + return new KTableValueGetterSupplier, T>() { + + public KTableValueGetter, T> get() { + return new KStreamWindowAggregateValueGetter(); + } + + }; + } + + private class KStreamWindowAggregateValueGetter implements KTableValueGetter, T> { + + private WindowStore windowStore; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + windowStore = (WindowStore) context.getStateStore(storeName); + } + + @SuppressWarnings("unchecked") + @Override + public T get(Windowed windowedKey) { + K key = windowedKey.value(); + W window = (W) windowedKey.window(); + + // this iterator should contain at most one element + Iterator> iter = windowStore.fetch(key, window.start(), window.start()); + + return iter.hasNext() ? iter.next().value : null; + } + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java new file mode 100644 index 0000000..d532e79 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.Windows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; + +import java.util.Iterator; +import java.util.Map; + +public class KStreamWindowReduce implements KStreamAggProcessorSupplier, V, V> { + + private final String storeName; + private final Windows windows; + private final Reducer reducer; + + private boolean sendOldValues = false; + + public KStreamWindowReduce(Windows windows, String storeName, Reducer reducer) { + this.windows = windows; + this.storeName = storeName; + this.reducer = reducer; + } + + @Override + public Processor get() { + return new KStreamWindowReduceProcessor(); + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + private class KStreamWindowReduceProcessor extends AbstractProcessor { + + private WindowStore windowStore; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + super.init(context); + + windowStore = (WindowStore) context.getStateStore(storeName); + } + + @Override + public void process(K key, V value) { + // first get the matching windows + long timestamp = context().timestamp(); + + Map matchedWindows = windows.windowsFor(timestamp); + + long timeFrom = Long.MAX_VALUE; + long timeTo = Long.MIN_VALUE; + + // use range query on window store for efficient reads + for (long windowStartMs : matchedWindows.keySet()) { + timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom; + timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; + } + + WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo); + + // for each matching window, try to update the corresponding key and send to the downstream + while (iter.hasNext()) { + KeyValue entry = iter.next(); + W window = matchedWindows.get(entry.key); + + if (window != null) { + + V oldAgg = entry.value; + V newAgg = oldAgg; + + // try to add the new new value (there will never be old value) + if (newAgg == null) { + newAgg = value; + } else { + newAgg = reducer.apply(newAgg, value); + } + + // update the store with the new value + windowStore.put(key, newAgg, window.start()); + + // forward the aggregated change pair + if (sendOldValues) + context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); + + matchedWindows.remove(entry.key); + } + } + + iter.close(); + + // create the new window for the rest of unmatched window that do not exist yet + for (long windowStartMs : matchedWindows.keySet()) { + windowStore.put(key, value, windowStartMs); + + // send the new aggregate pair (there will be no old value) + context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null)); + } + } + } + + @Override + public KTableValueGetterSupplier, V> view() { + + return new KTableValueGetterSupplier, V>() { + + public KTableValueGetter, V> get() { + return new KStreamAggregateValueGetter(); + } + + }; + } + + private class KStreamAggregateValueGetter implements KTableValueGetter, V> { + + private WindowStore windowStore; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + windowStore = (WindowStore) context.getStateStore(storeName); + } + + @SuppressWarnings("unchecked") + @Override + public V get(Windowed windowedKey) { + K key = windowedKey.value(); + W window = (W) windowedKey.window(); + + // this iterator should only contain one element + Iterator> iter = windowStore.fetch(key, window.start(), window.start()); + + return iter.next().value; + } + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index fa4cd93..b82582b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -76,7 +76,7 @@ public class KTableImpl extends AbstractStream implements KTable processorSupplier; + public final ProcessorSupplier processorSupplier; private final Serializer keySerializer; private final Serializer valSerializer; @@ -87,14 +87,14 @@ public class KTableImpl extends AbstractStream implements KTable processorSupplier, + ProcessorSupplier processorSupplier, Set sourceNodes) { this(topology, name, processorSupplier, sourceNodes, null, null, null, null); } public KTableImpl(KStreamBuilder topology, String name, - ProcessorSupplier processorSupplier, + ProcessorSupplier processorSupplier, Set sourceNodes, Serializer keySerializer, Serializer valSerializer, @@ -389,6 +389,8 @@ public class KTableImpl extends AbstractStream implements KTable source = (KTableSource) processorSupplier; materialize(source); return new KTableSourceValueGetterSupplier<>(source.topic); + } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { + return ((KStreamAggProcessorSupplier) processorSupplier).view(); } else { return ((KTableProcessorSupplier) processorSupplier).view(); } @@ -401,6 +403,8 @@ public class KTableImpl extends AbstractStream implements KTable source = (KTableSource) processorSupplier; materialize(source); source.enableSendingOldValues(); + } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { + ((KStreamAggProcessorSupplier) processorSupplier).enableSendingOldValues(); } else { ((KTableProcessorSupplier) processorSupplier).enableSendingOldValues(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java index d647b72..df03280 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java @@ -24,5 +24,4 @@ public interface KTableProcessorSupplier extends ProcessorSupplier view(); void enableSendingOldValues(); - }