kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Move streams-examples source files under src folder
Date Wed, 02 Mar 2016 02:54:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 79662cc7c -> edeb11bc5


MINOR: Move streams-examples source files under src folder

Also remove some unused imports.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #992 from guozhangwang/KSExamples


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/edeb11bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/edeb11bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/edeb11bc

Branch: refs/heads/trunk
Commit: edeb11bc561c0721855b34a4506dc7f1c76445c6
Parents: 79662cc
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue Mar 1 18:53:58 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Mar 1 18:53:58 2016 -0800

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |   5 +
 build.gradle                                    |   3 +-
 checkstyle/import-control.xml                   |   5 +
 .../examples/pageview/JsonPOJODeserializer.java |  61 ------
 .../examples/pageview/JsonPOJOSerializer.java   |  60 ------
 .../pageview/JsonTimestampExtractor.java        |  46 -----
 .../examples/pageview/PageViewTypedJob.java     | 186 -------------------
 .../examples/pageview/PageViewUnTypedJob.java   | 142 --------------
 .../kafka/streams/examples/pipe/PipeJob.java    |  66 -------
 .../examples/wordcount/WordCountJob.java        |  99 ----------
 .../wordcount/WordCountProcessorJob.java        | 138 --------------
 .../examples/pageview/JsonPOJODeserializer.java |  61 ++++++
 .../examples/pageview/JsonPOJOSerializer.java   |  60 ++++++
 .../pageview/JsonTimestampExtractor.java        |  46 +++++
 .../examples/pageview/PageViewTypedJob.java     | 185 ++++++++++++++++++
 .../examples/pageview/PageViewUntypedJob.java   | 141 ++++++++++++++
 .../kafka/streams/examples/pipe/PipeJob.java    |  65 +++++++
 .../examples/wordcount/WordCountJob.java        |  98 ++++++++++
 .../wordcount/WordCountProcessorJob.java        | 137 ++++++++++++++
 19 files changed, 805 insertions(+), 799 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 344e47f..f45d8d4 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -52,6 +52,11 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
+for file in $base_dir/streams/examples/build/libs/kafka-streams-examples*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
 for file in $base_dir/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
 do
   CLASSPATH=$CLASSPATH:$file

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 5953f57..daf468f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -259,7 +259,7 @@ for ( sv in ['2_10', '2_11'] ) {
 }
 
 def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file']
-def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs
+def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs
 
 tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
 tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10', 'jar_core_2_11'] + pkgs.collect { it + ":jar" }) { }
@@ -374,6 +374,7 @@ project(':core') {
     from(project(':connect:file').jar) { into("libs/") }
     from(project(':connect:file').configurations.runtime) { into("libs/") }
     from(project(':streams').jar) { into("libs/") }
+    from(project(':streams:examples').jar) { into("libs/") }
   }
 
   jar {

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b183b3d..051c8d1 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -127,6 +127,11 @@
 
     <allow pkg="org.apache.kafka.streams"/>
 
+    <subpackage name="examples">
+      <allow pkg="com.fasterxml.jackson.databind" />
+      <allow pkg="org.apache.kafka.connect.json" />
+    </subpackage>
+
     <subpackage name="state">
       <allow pkg="org.rocksdb" />
     </subpackage>

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
deleted file mode 100644
index 5fcd1f3..0000000
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.streams.examples.pageview;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-
-import java.util.Map;
-
-public class JsonPOJODeserializer<T> implements Deserializer<T> {
-    private ObjectMapper objectMapper = new ObjectMapper();
-
-    private Class<T> tClass;
-
-    /**
-     * Default constructor needed by Kafka
-     */
-    public JsonPOJODeserializer() {
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void configure(Map<String, ?> props, boolean isKey) {
-        tClass = (Class<T>) props.get("JsonPOJOClass");
-    }
-
-    @Override
-    public T deserialize(String topic, byte[] bytes) {
-        if (bytes == null)
-            return null;
-
-        T data;
-        try {
-            data = objectMapper.readValue(bytes, tClass);
-        } catch (Exception e) {
-            throw new SerializationException(e);
-        }
-
-        return data;
-    }
-
-    @Override
-    public void close() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
deleted file mode 100644
index bb60327..0000000
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.streams.examples.pageview;
-
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.util.Map;
-
-public class JsonPOJOSerializer<T> implements Serializer<T> {
-    private final ObjectMapper objectMapper = new ObjectMapper();
-
-    private Class<T> tClass;
-
-    /**
-     * Default constructor needed by Kafka
-     */
-    public JsonPOJOSerializer() {
-
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void configure(Map<String, ?> props, boolean isKey) {
-        tClass = (Class<T>) props.get("JsonPOJOClass");
-    }
-
-    @Override
-    public byte[] serialize(String topic, T data) {
-        if (data == null)
-            return null;
-
-        try {
-            return objectMapper.writeValueAsBytes(data);
-        } catch (Exception e) {
-            throw new SerializationException("Error serializing JSON message", e);
-        }
-    }
-
-    @Override
-    public void close() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
deleted file mode 100644
index 6443193..0000000
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pageview;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-/**
- * A timestamp extractor implementation that tries to extract event time from
- * the "timestamp" field in the Json formatted message.
- */
-public class JsonTimestampExtractor implements TimestampExtractor {
-
-    @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
-        if (record.value() instanceof PageViewTypedJob.PageView) {
-            return ((PageViewTypedJob.PageView) record.value()).timestamp;
-        }
-
-        if (record.value() instanceof PageViewTypedJob.UserProfile) {
-            return ((PageViewTypedJob.UserProfile) record.value()).timestamp;
-        }
-
-        if (record.value() instanceof JsonNode) {
-            return ((JsonNode) record.value()).get("timestamp").longValue();
-        }
-
-        throw new IllegalArgumentException("JsonTimestampExtractor cannot recognize the record value " + record.value());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
deleted file mode 100644
index f7266e3..0000000
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pageview;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
- * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PageViewTypedJob {
-
-    // POJO classes
-    static public class PageView {
-        public String user;
-        public String page;
-        public Long timestamp;
-    }
-
-    static public class UserProfile {
-        public String region;
-        public Long timestamp;
-    }
-
-    static public class PageViewByRegion {
-        public String user;
-        public String page;
-        public String region;
-    }
-
-    static public class WindowedPageViewByRegion {
-        public long windowStart;
-        public String region;
-    }
-
-    static public class RegionCount {
-        public long count;
-        public String region;
-    }
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Long> longSerializer = new LongSerializer();
-        final Deserializer<Long> longDeserializer = new LongDeserializer();
-
-        // TODO: the following can be removed with a serialization factory
-        Map<String, Object> serdeProps = new HashMap<>();
-
-        final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", PageView.class);
-        pageViewDeserializer.configure(serdeProps, false);
-
-        final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileDeserializer.configure(serdeProps, false);
-
-        final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileSerializer.configure(serdeProps, false);
-
-        final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
-        wPageViewByRegionSerializer.configure(serdeProps, false);
-
-        final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", RegionCount.class);
-        regionCountSerializer.configure(serdeProps, false);
-
-        KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
-
-        KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
-
-        KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
-                .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
-                    @Override
-                    public PageViewByRegion apply(PageView view, UserProfile profile) {
-                        PageViewByRegion viewByRegion = new PageViewByRegion();
-                        viewByRegion.user = view.user;
-                        viewByRegion.page = view.page;
-
-                        if (profile != null) {
-                            viewByRegion.region = profile.region;
-                        } else {
-                            viewByRegion.region = "UNKNOWN";
-                        }
-                        return viewByRegion;
-                    }
-                })
-                .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
-                    @Override
-                    public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
-                        return new KeyValue<>(viewRegion.region, viewRegion);
-                    }
-                })
-                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
-                        stringSerializer, longSerializer,
-                        stringDeserializer, longDeserializer)
-                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
-                    @Override
-                    public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
-                        WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
-                        wViewByRegion.windowStart = key.window().start();
-                        wViewByRegion.region = key.value();
-
-                        RegionCount rCount = new RegionCount();
-                        rCount.region = key.value();
-                        rCount.count = value;
-
-                        return new KeyValue<>(wViewByRegion, rCount);
-                    }
-                });
-
-        // write to the result topic
-        regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
deleted file mode 100644
index 3241b8f..0000000
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pageview;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.connect.json.JsonSerializer;
-import org.apache.kafka.connect.json.JsonDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.util.Properties;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
- * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PageViewUntypedJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Long> longSerializer = new LongSerializer();
-        final Deserializer<Long> longDeserializer = new LongDeserializer();
-        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
-        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
-
-        KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
-
-        KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
-
-        KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
-            @Override
-            public String apply(JsonNode record) {
-                return record.get("region").textValue();
-            }
-        });
-
-        KStream<JsonNode, JsonNode> regionCount = views
-                .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
-                            @Override
-                            public JsonNode apply(JsonNode view, String region) {
-                                ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
-                                return jNode.put("user", view.get("user").textValue())
-                                        .put("page", view.get("page").textValue())
-                                        .put("region", region == null ? "UNKNOWN" : region);
-                            }
-                        })
-                .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
-                    @Override
-                    public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
-                        return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
-                    }
-                })
-                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
-                        stringSerializer, longSerializer,
-                        stringDeserializer, longDeserializer)
-                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
-                    @Override
-                    public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
-                        ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
-                        keyNode.put("window-start", key.window().start())
-                                .put("region", key.value());
-
-                        ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
-                        valueNode.put("count", value);
-
-                        return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
-                    }
-                });
-
-        // write to the result topic
-        regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
deleted file mode 100644
index 79649d1..0000000
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pipe;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.Properties;
-
-/**
- * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
- * write data to a sink (output) topic.
- *
- * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
- * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PipeJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        builder.stream("streams-file-input").to("streams-pipe-output");
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
deleted file mode 100644
index 965eb79..0000000
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.wordcount;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueMapper;
-
-import java.util.Arrays;
-import java.util.Properties;
-
-/**
- * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
- * that computes a simple word occurrence histogram from an input text.
- *
- * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
- * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
- * is an updated count of a single word.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class WordCountJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Long> longSerializer = new LongSerializer();
-        final Deserializer<Long> longDeserializer = new LongDeserializer();
-
-        KStream<String, String> source = builder.stream("streams-file-input");
-
-        KTable<String, Long> counts = source
-                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                    @Override
-                    public Iterable<String> apply(String value) {
-                        return Arrays.asList(value.toLowerCase().split(" "));
-                    }
-                }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
-                    @Override
-                    public KeyValue<String, String> apply(String key, String value) {
-                        return new KeyValue<String, String>(value, value);
-                    }
-                })
-                .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts");
-
-        counts.to("streams-wordcount-output", stringSerializer, longSerializer);
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
deleted file mode 100644
index f5dd775..0000000
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.wordcount;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-
-import java.util.Properties;
-
-/**
- * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
- * that computes a simple word occurrence histogram from an input text.
- *
- * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
- * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
- * is an updated count of a single word.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class WordCountProcessorJob {
-
-    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
-
-        @Override
-        public Processor<String, String> get() {
-            return new Processor<String, String>() {
-                private ProcessorContext context;
-                private KeyValueStore<String, Integer> kvStore;
-
-                @Override
-                @SuppressWarnings("unchecked")
-                public void init(ProcessorContext context) {
-                    this.context = context;
-                    this.context.schedule(1000);
-                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
-                }
-
-                @Override
-                public void process(String dummy, String line) {
-                    String words[] = line.toLowerCase().split(" ");
-
-                    for (String word : words) {
-                        Integer oldValue = this.kvStore.get(word);
-
-                        if (oldValue == null) {
-                            this.kvStore.put(word, 1);
-                        } else {
-                            this.kvStore.put(word, oldValue + 1);
-                        }
-                    }
-
-                    context.commit();
-                }
-
-                @Override
-                public void punctuate(long timestamp) {
-                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
-
-                    System.out.println("----------- " + timestamp + " ----------- ");
-
-                    while (iter.hasNext()) {
-                        KeyValue<String, Integer> entry = iter.next();
-
-                        System.out.println("[" + entry.key + ", " + entry.value + "]");
-
-                        context.forward(entry.key, entry.value.toString());
-                    }
-
-                    iter.close();
-                }
-
-                @Override
-                public void close() {
-                    this.kvStore.close();
-                }
-            };
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("Source", "streams-file-input");
-
-        builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
-        builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
-
-        builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
new file mode 100644
index 0000000..5fcd1f3
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+
+public class JsonPOJODeserializer<T> implements Deserializer<T> {
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    private Class<T> tClass;
+
+    /**
+     * Default constructor needed by Kafka
+     */
+    public JsonPOJODeserializer() {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> props, boolean isKey) {
+        tClass = (Class<T>) props.get("JsonPOJOClass");
+    }
+
+    @Override
+    public T deserialize(String topic, byte[] bytes) {
+        if (bytes == null)
+            return null;
+
+        T data;
+        try {
+            data = objectMapper.readValue(bytes, tClass);
+        } catch (Exception e) {
+            throw new SerializationException(e);
+        }
+
+        return data;
+    }
+
+    @Override
+    public void close() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
new file mode 100644
index 0000000..bb60327
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.streams.examples.pageview;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+
+public class JsonPOJOSerializer<T> implements Serializer<T> {
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private Class<T> tClass;
+
+    /**
+     * Default constructor needed by Kafka
+     */
+    public JsonPOJOSerializer() {
+
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> props, boolean isKey) {
+        tClass = (Class<T>) props.get("JsonPOJOClass");
+    }
+
+    @Override
+    public byte[] serialize(String topic, T data) {
+        if (data == null)
+            return null;
+
+        try {
+            return objectMapper.writeValueAsBytes(data);
+        } catch (Exception e) {
+            throw new SerializationException("Error serializing JSON message", e);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
new file mode 100644
index 0000000..6443193
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+/**
+ * A timestamp extractor implementation that tries to extract event time from
+ * the "timestamp" field in the Json formatted message.
+ */
+public class JsonTimestampExtractor implements TimestampExtractor {
+
+    @Override
+    public long extract(ConsumerRecord<Object, Object> record) {
+        if (record.value() instanceof PageViewTypedJob.PageView) {
+            return ((PageViewTypedJob.PageView) record.value()).timestamp;
+        }
+
+        if (record.value() instanceof PageViewTypedJob.UserProfile) {
+            return ((PageViewTypedJob.UserProfile) record.value()).timestamp;
+        }
+
+        if (record.value() instanceof JsonNode) {
+            return ((JsonNode) record.value()).get("timestamp").longValue();
+        }
+
+        throw new IllegalArgumentException("JsonTimestampExtractor cannot recognize the record value " + record.value());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
new file mode 100644
index 0000000..6a105fd
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PageViewTypedJob {
+
+    // POJO classes
+    static public class PageView {
+        public String user;
+        public String page;
+        public Long timestamp;
+    }
+
+    static public class UserProfile {
+        public String region;
+        public Long timestamp;
+    }
+
+    static public class PageViewByRegion {
+        public String user;
+        public String page;
+        public String region;
+    }
+
+    static public class WindowedPageViewByRegion {
+        public long windowStart;
+        public String region;
+    }
+
+    static public class RegionCount {
+        public long count;
+        public String region;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<Long> longSerializer = new LongSerializer();
+        final Deserializer<Long> longDeserializer = new LongDeserializer();
+
+        // TODO: the following can be removed with a serialization factory
+        Map<String, Object> serdeProps = new HashMap<>();
+
+        final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", PageView.class);
+        pageViewDeserializer.configure(serdeProps, false);
+
+        final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", UserProfile.class);
+        userProfileDeserializer.configure(serdeProps, false);
+
+        final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", UserProfile.class);
+        userProfileSerializer.configure(serdeProps, false);
+
+        final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
+        wPageViewByRegionSerializer.configure(serdeProps, false);
+
+        final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", RegionCount.class);
+        regionCountSerializer.configure(serdeProps, false);
+
+        KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
+
+        KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
+
+        KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
+                .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
+                    @Override
+                    public PageViewByRegion apply(PageView view, UserProfile profile) {
+                        PageViewByRegion viewByRegion = new PageViewByRegion();
+                        viewByRegion.user = view.user;
+                        viewByRegion.page = view.page;
+
+                        if (profile != null) {
+                            viewByRegion.region = profile.region;
+                        } else {
+                            viewByRegion.region = "UNKNOWN";
+                        }
+                        return viewByRegion;
+                    }
+                })
+                .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
+                    @Override
+                    public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
+                        return new KeyValue<>(viewRegion.region, viewRegion);
+                    }
+                })
+                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+                        stringSerializer, longSerializer,
+                        stringDeserializer, longDeserializer)
+                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
+                .toStream()
+                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
+                    @Override
+                    public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
+                        WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+                        wViewByRegion.windowStart = key.window().start();
+                        wViewByRegion.region = key.value();
+
+                        RegionCount rCount = new RegionCount();
+                        rCount.region = key.value();
+                        rCount.count = value;
+
+                        return new KeyValue<>(wViewByRegion, rCount);
+                    }
+                });
+
+        // write to the result topic
+        regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the streaming job would be ever running,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
new file mode 100644
index 0000000..e890589
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.connect.json.JsonDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PageViewUntypedJob {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<Long> longSerializer = new LongSerializer();
+        final Deserializer<Long> longDeserializer = new LongDeserializer();
+        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
+        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
+
+        KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
+
+        KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
+
+        KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
+            @Override
+            public String apply(JsonNode record) {
+                return record.get("region").textValue();
+            }
+        });
+
+        KStream<JsonNode, JsonNode> regionCount = views
+                .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
+                    @Override
+                    public JsonNode apply(JsonNode view, String region) {
+                        ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+                        return jNode.put("user", view.get("user").textValue())
+                                .put("page", view.get("page").textValue())
+                                .put("region", region == null ? "UNKNOWN" : region);
+                    }
+                })
+                .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
+                    @Override
+                    public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
+                        return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
+                    }
+                })
+                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+                        stringSerializer, longSerializer,
+                        stringDeserializer, longDeserializer)
+                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
+                .toStream()
+                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
+                    @Override
+                    public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
+                        ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+                        keyNode.put("window-start", key.window().start())
+                                .put("region", key.value());
+
+                        ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+                        valueNode.put("count", value);
+
+                        return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
+                    }
+                });
+
+        // write to the result topic
+        regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the streaming job would be ever running,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
new file mode 100644
index 0000000..8885ca2
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pipe;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
+ * write data to a sink (output) topic.
+ *
+ * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
+ * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PipeJob {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        builder.stream("streams-file-input").to("streams-pipe-output");
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the streaming job would be ever running,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
new file mode 100644
index 0000000..82d216e
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class WordCountJob {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<Long> longSerializer = new LongSerializer();
+        final Deserializer<Long> longDeserializer = new LongDeserializer();
+
+        KStream<String, String> source = builder.stream("streams-file-input");
+
+        KTable<String, Long> counts = source
+                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                    @Override
+                    public Iterable<String> apply(String value) {
+                        return Arrays.asList(value.toLowerCase().split(" "));
+                    }
+                }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+                    @Override
+                    public KeyValue<String, String> apply(String key, String value) {
+                        return new KeyValue<String, String>(value, value);
+                    }
+                })
+                .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts");
+
+        counts.to("streams-wordcount-output", stringSerializer, longSerializer);
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the streaming job would be ever running,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
new file mode 100644
index 0000000..cb82656
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class WordCountProcessorJob {
+
+    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
+
+        @Override
+        public Processor<String, String> get() {
+            return new Processor<String, String>() {
+                private ProcessorContext context;
+                private KeyValueStore<String, Integer> kvStore;
+
+                @Override
+                @SuppressWarnings("unchecked")
+                public void init(ProcessorContext context) {
+                    this.context = context;
+                    this.context.schedule(1000);
+                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+                }
+
+                @Override
+                public void process(String dummy, String line) {
+                    String[] words = line.toLowerCase().split(" ");
+
+                    for (String word : words) {
+                        Integer oldValue = this.kvStore.get(word);
+
+                        if (oldValue == null) {
+                            this.kvStore.put(word, 1);
+                        } else {
+                            this.kvStore.put(word, oldValue + 1);
+                        }
+                    }
+
+                    context.commit();
+                }
+
+                @Override
+                public void punctuate(long timestamp) {
+                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
+
+                    System.out.println("----------- " + timestamp + " ----------- ");
+
+                    while (iter.hasNext()) {
+                        KeyValue<String, Integer> entry = iter.next();
+
+                        System.out.println("[" + entry.key + ", " + entry.value + "]");
+
+                        context.forward(entry.key, entry.value.toString());
+                    }
+
+                    iter.close();
+                }
+
+                @Override
+                public void close() {
+                    this.kvStore.close();
+                }
+            };
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("Source", "streams-file-input");
+
+        builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
+        builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
+
+        builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the streaming job would be ever running,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}


Mime
View raw message