kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-3411: Streams: stop using "job" terminology, rename job.id to application.id
Date Thu, 17 Mar 2016 17:41:54 GMT
KAFKA-3411: Streams: stop using "job" terminology, rename job.id to application.id

guozhangwang ymatsuda : please review.

Author: Michael G. Noll <michael@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1081 from miguno/KAFKA-3411


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/958e10c8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/958e10c8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/958e10c8

Branch: refs/heads/trunk
Commit: 958e10c87ce293c3bf59bb9840eaaae915eff25e
Parents: 9a836d0
Author: Michael G. Noll <michael@confluent.io>
Authored: Thu Mar 17 10:41:48 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Mar 17 10:41:48 2016 -0700

----------------------------------------------------------------------
 .../pageview/JsonTimestampExtractor.java        |   8 +-
 .../examples/pageview/PageViewTypedDemo.java    | 180 ++++++++++++++++++
 .../examples/pageview/PageViewTypedJob.java     | 184 -------------------
 .../examples/pageview/PageViewUntypedDemo.java  | 136 ++++++++++++++
 .../examples/pageview/PageViewUntypedJob.java   | 140 --------------
 .../kafka/streams/examples/pipe/PipeDemo.java   |  65 +++++++
 .../kafka/streams/examples/pipe/PipeJob.java    |  65 -------
 .../examples/wordcount/WordCountDemo.java       |  96 ++++++++++
 .../examples/wordcount/WordCountJob.java        |  96 ----------
 .../wordcount/WordCountProcessorDemo.java       | 137 ++++++++++++++
 .../wordcount/WordCountProcessorJob.java        | 137 --------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  12 +-
 .../org/apache/kafka/streams/StreamsConfig.java |  14 +-
 .../streams/processor/PartitionGrouper.java     |   8 +-
 .../streams/processor/ProcessorContext.java     |   6 +-
 .../streams/processor/TopologyBuilder.java      |  42 ++---
 .../processor/internals/AbstractTask.java       |  16 +-
 .../internals/ProcessorContextImpl.java         |   9 +-
 .../internals/ProcessorStateManager.java        |  14 +-
 .../processor/internals/StandbyContextImpl.java |  14 +-
 .../processor/internals/StandbyTask.java        |  12 +-
 .../internals/StreamPartitionAssignor.java      |   4 +-
 .../streams/processor/internals/StreamTask.java |   6 +-
 .../processor/internals/StreamThread.java       |  28 +--
 .../state/internals/StoreChangeLogger.java      |   2 +-
 .../apache/kafka/streams/StreamsConfigTest.java |   7 +-
 .../internals/ProcessorStateManagerTest.java    |  28 +--
 .../internals/ProcessorTopologyTest.java        |   2 +-
 .../processor/internals/StandbyTaskTest.java    |  20 +-
 .../internals/StreamPartitionAssignorTest.java  |   2 +-
 .../processor/internals/StreamTaskTest.java     |   6 +-
 .../processor/internals/StreamThreadTest.java   |  34 ++--
 .../streams/smoketest/SmokeTestClient.java      |   2 +-
 .../apache/kafka/test/MockProcessorContext.java |   4 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   6 +-
 35 files changed, 762 insertions(+), 780 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
index 6443193..63e8377 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -29,12 +29,12 @@ public class JsonTimestampExtractor implements TimestampExtractor {
 
     @Override
     public long extract(ConsumerRecord<Object, Object> record) {
-        if (record.value() instanceof PageViewTypedJob.PageView) {
-            return ((PageViewTypedJob.PageView) record.value()).timestamp;
+        if (record.value() instanceof PageViewTypedDemo.PageView) {
+            return ((PageViewTypedDemo.PageView) record.value()).timestamp;
         }
 
-        if (record.value() instanceof PageViewTypedJob.UserProfile) {
-            return ((PageViewTypedJob.UserProfile) record.value()).timestamp;
+        if (record.value() instanceof PageViewTypedDemo.UserProfile) {
+            return ((PageViewTypedDemo.UserProfile) record.value()).timestamp;
         }
 
         if (record.value() instanceof JsonNode) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
new file mode 100644
index 0000000..4f9de29
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PageViewTypedDemo {
+
+    // POJO classes
+    static public class PageView {
+        public String user;
+        public String page;
+        public Long timestamp;
+    }
+
+    static public class UserProfile {
+        public String region;
+        public Long timestamp;
+    }
+
+    static public class PageViewByRegion {
+        public String user;
+        public String page;
+        public String region;
+    }
+
+    static public class WindowedPageViewByRegion {
+        public long windowStart;
+        public String region;
+    }
+
+    static public class RegionCount {
+        public long count;
+        public String region;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+
+        // TODO: the following can be removed with a serialization factory
+        Map<String, Object> serdeProps = new HashMap<>();
+
+        final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", PageView.class);
+        pageViewDeserializer.configure(serdeProps, false);
+
+        final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", UserProfile.class);
+        userProfileDeserializer.configure(serdeProps, false);
+
+        final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", UserProfile.class);
+        userProfileSerializer.configure(serdeProps, false);
+
+        final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
+        wPageViewByRegionSerializer.configure(serdeProps, false);
+
+        final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", RegionCount.class);
+        regionCountSerializer.configure(serdeProps, false);
+
+        KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
+
+        KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
+
+        KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
+                .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
+                    @Override
+                    public PageViewByRegion apply(PageView view, UserProfile profile) {
+                        PageViewByRegion viewByRegion = new PageViewByRegion();
+                        viewByRegion.user = view.user;
+                        viewByRegion.page = view.page;
+
+                        if (profile != null) {
+                            viewByRegion.region = profile.region;
+                        } else {
+                            viewByRegion.region = "UNKNOWN";
+                        }
+                        return viewByRegion;
+                    }
+                })
+                .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
+                    @Override
+                    public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
+                        return new KeyValue<>(viewRegion.region, viewRegion);
+                    }
+                })
+                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+                        stringSerializer, stringDeserializer)
+                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
+                .toStream()
+                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
+                    @Override
+                    public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
+                        WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+                        wViewByRegion.windowStart = key.window().start();
+                        wViewByRegion.region = key.value();
+
+                        RegionCount rCount = new RegionCount();
+                        rCount.region = key.value();
+                        rCount.count = value;
+
+                        return new KeyValue<>(wViewByRegion, rCount);
+                    }
+                });
+
+        // write to the result topic
+        regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the stream application would be running forever,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
deleted file mode 100644
index 1fcb403..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pageview;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
- * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PageViewTypedJob {
-
-    // POJO classes
-    static public class PageView {
-        public String user;
-        public String page;
-        public Long timestamp;
-    }
-
-    static public class UserProfile {
-        public String region;
-        public Long timestamp;
-    }
-
-    static public class PageViewByRegion {
-        public String user;
-        public String page;
-        public String region;
-    }
-
-    static public class WindowedPageViewByRegion {
-        public long windowStart;
-        public String region;
-    }
-
-    static public class RegionCount {
-        public long count;
-        public String region;
-    }
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Long> longSerializer = new LongSerializer();
-        final Deserializer<Long> longDeserializer = new LongDeserializer();
-
-        // TODO: the following can be removed with a serialization factory
-        Map<String, Object> serdeProps = new HashMap<>();
-
-        final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", PageView.class);
-        pageViewDeserializer.configure(serdeProps, false);
-
-        final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileDeserializer.configure(serdeProps, false);
-
-        final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileSerializer.configure(serdeProps, false);
-
-        final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
-        wPageViewByRegionSerializer.configure(serdeProps, false);
-
-        final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", RegionCount.class);
-        regionCountSerializer.configure(serdeProps, false);
-
-        KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
-
-        KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
-
-        KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
-                .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
-                    @Override
-                    public PageViewByRegion apply(PageView view, UserProfile profile) {
-                        PageViewByRegion viewByRegion = new PageViewByRegion();
-                        viewByRegion.user = view.user;
-                        viewByRegion.page = view.page;
-
-                        if (profile != null) {
-                            viewByRegion.region = profile.region;
-                        } else {
-                            viewByRegion.region = "UNKNOWN";
-                        }
-                        return viewByRegion;
-                    }
-                })
-                .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
-                    @Override
-                    public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
-                        return new KeyValue<>(viewRegion.region, viewRegion);
-                    }
-                })
-                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
-                        stringSerializer, stringDeserializer)
-                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
-                    @Override
-                    public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
-                        WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
-                        wViewByRegion.windowStart = key.window().start();
-                        wViewByRegion.region = key.value();
-
-                        RegionCount rCount = new RegionCount();
-                        rCount.region = key.value();
-                        rCount.count = value;
-
-                        return new KeyValue<>(wViewByRegion, rCount);
-                    }
-                });
-
-        // write to the result topic
-        regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
new file mode 100644
index 0000000..9377095
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.connect.json.JsonDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PageViewUntypedDemo {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
+        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
+
+        KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
+
+        KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
+
+        KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
+            @Override
+            public String apply(JsonNode record) {
+                return record.get("region").textValue();
+            }
+        });
+
+        KStream<JsonNode, JsonNode> regionCount = views
+                .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
+                    @Override
+                    public JsonNode apply(JsonNode view, String region) {
+                        ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+                        return jNode.put("user", view.get("user").textValue())
+                                .put("page", view.get("page").textValue())
+                                .put("region", region == null ? "UNKNOWN" : region);
+                    }
+                })
+                .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
+                    @Override
+                    public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
+                        return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
+                    }
+                })
+                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+                        stringSerializer, stringDeserializer)
+                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
+                .toStream()
+                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
+                    @Override
+                    public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
+                        ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+                        keyNode.put("window-start", key.window().start())
+                                .put("region", key.value());
+
+                        ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+                        valueNode.put("count", value);
+
+                        return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
+                    }
+                });
+
+        // write to the result topic
+        regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the stream application would be running forever,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
deleted file mode 100644
index fb1a55d..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pageview;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.connect.json.JsonSerializer;
-import org.apache.kafka.connect.json.JsonDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.util.Properties;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
- * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PageViewUntypedJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Long> longSerializer = new LongSerializer();
-        final Deserializer<Long> longDeserializer = new LongDeserializer();
-        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
-        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
-
-        KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
-
-        KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
-
-        KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
-            @Override
-            public String apply(JsonNode record) {
-                return record.get("region").textValue();
-            }
-        });
-
-        KStream<JsonNode, JsonNode> regionCount = views
-                .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
-                    @Override
-                    public JsonNode apply(JsonNode view, String region) {
-                        ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
-                        return jNode.put("user", view.get("user").textValue())
-                                .put("page", view.get("page").textValue())
-                                .put("region", region == null ? "UNKNOWN" : region);
-                    }
-                })
-                .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
-                    @Override
-                    public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
-                        return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
-                    }
-                })
-                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
-                        stringSerializer, stringDeserializer)
-                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
-                    @Override
-                    public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
-                        ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
-                        keyNode.put("window-start", key.window().start())
-                                .put("region", key.value());
-
-                        ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
-                        valueNode.put("count", value);
-
-                        return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
-                    }
-                });
-
-        // write to the result topic
-        regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
new file mode 100644
index 0000000..c37c68a
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pipe;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
+ * write data to a sink (output) topic.
+ *
+ * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
+ * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PipeDemo {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        builder.stream("streams-file-input").to("streams-pipe-output");
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the stream application would be running forever,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
deleted file mode 100644
index 8885ca2..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pipe;
-
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.Properties;
-
-/**
- * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
- * write data to a sink (output) topic.
- *
- * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
- * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PipeJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        builder.stream("streams-file-input").to("streams-pipe-output");
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
new file mode 100644
index 0000000..03d5142
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class WordCountDemo {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<Long> longSerializer = new LongSerializer();
+
+        KStream<String, String> source = builder.stream("streams-file-input");
+
+        KTable<String, Long> counts = source
+                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                    @Override
+                    public Iterable<String> apply(String value) {
+                        return Arrays.asList(value.toLowerCase().split(" "));
+                    }
+                }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+                    @Override
+                    public KeyValue<String, String> apply(String key, String value) {
+                        return new KeyValue<String, String>(value, value);
+                    }
+                })
+                .countByKey(stringSerializer, stringDeserializer, "Counts");
+
+        counts.to("streams-wordcount-output", stringSerializer, longSerializer);
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the stream application would be running forever,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
deleted file mode 100644
index 2b51a44..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.wordcount;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueMapper;
-
-import java.util.Arrays;
-import java.util.Properties;
-
-/**
- * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
- * that computes a simple word occurrence histogram from an input text.
- *
- * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
- * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
- * is an updated count of a single word.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class WordCountJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Long> longSerializer = new LongSerializer();
-
-        KStream<String, String> source = builder.stream("streams-file-input");
-
-        KTable<String, Long> counts = source
-                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                    @Override
-                    public Iterable<String> apply(String value) {
-                        return Arrays.asList(value.toLowerCase().split(" "));
-                    }
-                }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
-                    @Override
-                    public KeyValue<String, String> apply(String key, String value) {
-                        return new KeyValue<String, String>(value, value);
-                    }
-                })
-                .countByKey(stringSerializer, stringDeserializer, "Counts");
-
-        counts.to("streams-wordcount-output", stringSerializer, longSerializer);
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
new file mode 100644
index 0000000..b651b3a
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class WordCountProcessorDemo {
+
+    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
+
+        @Override
+        public Processor<String, String> get() {
+            return new Processor<String, String>() {
+                private ProcessorContext context;
+                private KeyValueStore<String, Integer> kvStore;
+
+                @Override
+                @SuppressWarnings("unchecked")
+                public void init(ProcessorContext context) {
+                    this.context = context;
+                    this.context.schedule(1000);
+                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+                }
+
+                @Override
+                public void process(String dummy, String line) {
+                    String[] words = line.toLowerCase().split(" ");
+
+                    for (String word : words) {
+                        Integer oldValue = this.kvStore.get(word);
+
+                        if (oldValue == null) {
+                            this.kvStore.put(word, 1);
+                        } else {
+                            this.kvStore.put(word, oldValue + 1);
+                        }
+                    }
+
+                    context.commit();
+                }
+
+                @Override
+                public void punctuate(long timestamp) {
+                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
+
+                    System.out.println("----------- " + timestamp + " ----------- ");
+
+                    while (iter.hasNext()) {
+                        KeyValue<String, Integer> entry = iter.next();
+
+                        System.out.println("[" + entry.key + ", " + entry.value + "]");
+
+                        context.forward(entry.key, entry.value.toString());
+                    }
+
+                    iter.close();
+                }
+
+                @Override
+                public void close() {
+                    this.kvStore.close();
+                }
+            };
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("Source", "streams-file-input");
+
+        builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
+        builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
+
+        builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the stream application would be running forever,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
deleted file mode 100644
index cb82656..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.wordcount;
-
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-
-import java.util.Properties;
-
-/**
- * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
- * that computes a simple word occurrence histogram from an input text.
- *
- * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
- * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
- * is an updated count of a single word.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class WordCountProcessorJob {
-
-    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
-
-        @Override
-        public Processor<String, String> get() {
-            return new Processor<String, String>() {
-                private ProcessorContext context;
-                private KeyValueStore<String, Integer> kvStore;
-
-                @Override
-                @SuppressWarnings("unchecked")
-                public void init(ProcessorContext context) {
-                    this.context = context;
-                    this.context.schedule(1000);
-                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
-                }
-
-                @Override
-                public void process(String dummy, String line) {
-                    String[] words = line.toLowerCase().split(" ");
-
-                    for (String word : words) {
-                        Integer oldValue = this.kvStore.get(word);
-
-                        if (oldValue == null) {
-                            this.kvStore.put(word, 1);
-                        } else {
-                            this.kvStore.put(word, oldValue + 1);
-                        }
-                    }
-
-                    context.commit();
-                }
-
-                @Override
-                public void punctuate(long timestamp) {
-                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
-
-                    System.out.println("----------- " + timestamp + " ----------- ");
-
-                    while (iter.hasNext()) {
-                        KeyValue<String, Integer> entry = iter.next();
-
-                        System.out.println("[" + entry.key + ", " + entry.value + "]");
-
-                        context.forward(entry.key, entry.value.toString());
-                    }
-
-                    iter.close();
-                }
-
-                @Override
-                public void close() {
-                    this.kvStore.close();
-                }
-            };
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("Source", "streams-file-input");
-
-        builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
-        builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
-
-        builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 15d6d8b..20958e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or
  * more threads specified in the configs for the processing work.
  * <p>
- * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same job ID (whether in this same process, on other processes
+ * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same application ID (whether in this same process, on other processes
  * on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work
  * based on the assignment of the input topic partitions so that all partitions are being
  * consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves
@@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * A simple example might look like this:
  * <pre>
  *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
- *    props.put(StreamsConfig.JOB_ID_CONFIG, "my-job");
+ *    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
  *    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  *    props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  *    props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -113,12 +113,12 @@ public class KafkaStreams {
 
         this.processId = UUID.randomUUID();
 
-        // JobId is a required config and hence should always have value
-        String jobId = config.getString(StreamsConfig.JOB_ID_CONFIG);
+        // The application ID is a required config and hence should always have value
+        String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
-            clientId = jobId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+            clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
 
         List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class);
@@ -132,7 +132,7 @@ public class KafkaStreams {
 
         this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
+            this.threads[i] = new StreamThread(builder, config, applicationId, clientId, processId, metrics, time);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index c4b8ffe..52fdbd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -83,13 +83,13 @@ public class StreamsConfig extends AbstractConfig {
     public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
     private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
 
-    /** <code>job.id</code> */
-    public static final String JOB_ID_CONFIG = "job.id";
-    public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
+    /** <code>application.id</code> */
+    public static final String APPLICATION_ID_CONFIG = "application.id";
+    public static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
 
     /** <code>replication.factor</code> */
     public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
-    public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the job.";
+    public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application.";
 
     /** <code>key.serializer</code> */
     public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -124,10 +124,10 @@ public class StreamsConfig extends AbstractConfig {
     private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor";
 
     static {
-        CONFIG = new ConfigDef().define(JOB_ID_CONFIG,      // required with no default value
+        CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG,      // required with no default value
                                         Type.STRING,
                                         Importance.HIGH,
-                                        StreamsConfig.JOB_ID_DOC)
+                                        StreamsConfig.APPLICATION_ID_DOC)
                                 .define(BOOTSTRAP_SERVERS_CONFIG,       // required with no default value
                                         Type.STRING,
                                         Importance.HIGH,
@@ -297,7 +297,7 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     private void removeStreamsSpecificConfigs(Map<String, Object> props) {
-        props.remove(StreamsConfig.JOB_ID_CONFIG);
+        props.remove(StreamsConfig.APPLICATION_ID_CONFIG);
         props.remove(StreamsConfig.STATE_DIR_CONFIG);
         props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
         props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index ae9844d..0c94084 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -28,7 +28,8 @@ import java.util.Set;
  *
  * This grouper also acts as the stream task creation function along with partition distribution
  * such that each generated partition group is assigned with a distinct {@link TaskId};
- * the created task ids will then be assigned to Kafka Streams instances that host the stream job.
+ * the created task ids will then be assigned to Kafka Streams instances that host the stream
+ * processing application.
  */
 public interface PartitionGrouper {
 
@@ -37,9 +38,10 @@ public interface PartitionGrouper {
      * expected to be processed together must be in the same group. DefaultPartitionGrouper implements this
      * interface. See {@link DefaultPartitionGrouper} for more information.
      *
-     * @param topicGroups The map from the {@link TopologyBuilder#topicGroups() topic group} id to topics
+     * @param topicGroups The map from the {@link TopologyBuilder#topicGroups(String)} topic group} id to topics
      * @param metadata Metadata of the consuming cluster
      * @return a map of task ids to groups of partitions
      */
     Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata);
-}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 79376ba..e9d5252 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -29,11 +29,11 @@ import java.io.File;
 public interface ProcessorContext {
 
     /**
-     * Returns the job id
+     * Returns the application id
      *
-     * @return the job id
+     * @return the application id
      */
-    String jobId();
+    String applicationId();
 
     /**
      * Returns the task id

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 6e5aec5..ab7122b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -85,7 +85,7 @@ public class TopologyBuilder {
             this.name = name;
         }
 
-        public abstract ProcessorNode build(String jobId);
+        public abstract ProcessorNode build(String applicationId);
     }
 
     private static class ProcessorNodeFactory extends NodeFactory {
@@ -105,7 +105,7 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String jobId) {
+        public ProcessorNode build(String applicationId) {
             return new ProcessorNode(name, supplier.get(), stateStoreNames);
         }
     }
@@ -124,7 +124,7 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String jobId) {
+        public ProcessorNode build(String applicationId) {
             return new SourceNode(name, keyDeserializer, valDeserializer);
         }
     }
@@ -147,10 +147,10 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String jobId) {
+        public ProcessorNode build(String applicationId) {
             if (internalTopicNames.contains(topic)) {
-                // prefix the job id to the internal topic name
-                return new SinkNode(name, jobId + "-" + topic, keySerializer, valSerializer, partitioner);
+                // prefix the internal topic name with the application id
+                return new SinkNode(name, applicationId + "-" + topic, keySerializer, valSerializer, partitioner);
             } else {
                 return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
             }
@@ -496,7 +496,7 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public Map<Integer, TopicsInfo> topicGroups(String jobId) {
+    public Map<Integer, TopicsInfo> topicGroups(String applicationId) {
         Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
 
         if (nodeGroups == null)
@@ -514,8 +514,8 @@ public class TopologyBuilder {
                     // if some of the topics are internal, add them to the internal topics
                     for (String topic : topics) {
                         if (this.internalTopicNames.contains(topic)) {
-                            // prefix the job id to the internal topic name
-                            String internalTopic = jobId + "-" + topic;
+                            // prefix the internal topic name with the application id
+                            String internalTopic = applicationId + "-" + topic;
                             internalSourceTopics.add(internalTopic);
                             sourceTopics.add(internalTopic);
                         } else {
@@ -528,8 +528,8 @@ public class TopologyBuilder {
                 String topic = nodeToSinkTopic.get(node);
                 if (topic != null) {
                     if (internalTopicNames.contains(topic)) {
-                        // prefix the job id to the change log topic name
-                        sinkTopics.add(jobId + "-" + topic);
+                        // prefix the change log topic name with the application id
+                        sinkTopics.add(applicationId + "-" + topic);
                     } else {
                         sinkTopics.add(topic);
                     }
@@ -538,8 +538,8 @@ public class TopologyBuilder {
                 // if the node is connected to a state, add to the state topics
                 for (StateStoreFactory stateFactory : stateFactories.values()) {
                     if (stateFactory.isInternal && stateFactory.users.contains(node)) {
-                        // prefix the job id to the change log topic name
-                        stateChangelogTopics.add(jobId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+                        // prefix the change log topic name with the application id
+                        stateChangelogTopics.add(applicationId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
                     }
                 }
             }
@@ -637,7 +637,7 @@ public class TopologyBuilder {
      *
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
      */
-    public ProcessorTopology build(String jobId, Integer topicGroupId) {
+    public ProcessorTopology build(String applicationId, Integer topicGroupId) {
         Set<String> nodeGroup;
         if (topicGroupId != null) {
             nodeGroup = nodeGroups().get(topicGroupId);
@@ -645,11 +645,11 @@ public class TopologyBuilder {
             // when nodeGroup is null, we build the full topology. this is used in some tests.
             nodeGroup = null;
         }
-        return build(jobId, nodeGroup);
+        return build(applicationId, nodeGroup);
     }
 
     @SuppressWarnings("unchecked")
-    private ProcessorTopology build(String jobId, Set<String> nodeGroup) {
+    private ProcessorTopology build(String applicationId, Set<String> nodeGroup) {
         List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
         Map<String, ProcessorNode> processorMap = new HashMap<>();
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
@@ -658,7 +658,7 @@ public class TopologyBuilder {
         // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
         for (NodeFactory factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                ProcessorNode node = factory.build(jobId);
+                ProcessorNode node = factory.build(applicationId);
                 processorNodes.add(node);
                 processorMap.put(node.name(), node);
 
@@ -674,8 +674,8 @@ public class TopologyBuilder {
                 } else if (factory instanceof SourceNodeFactory) {
                     for (String topic : ((SourceNodeFactory) factory).topics) {
                         if (internalTopicNames.contains(topic)) {
-                            // prefix the job id to the internal topic name
-                            topicSourceMap.put(jobId + "-" + topic, (SourceNode) node);
+                            // prefix the internal topic name with the application id
+                            topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node);
                         } else {
                             topicSourceMap.put(topic, (SourceNode) node);
                         }
@@ -697,11 +697,11 @@ public class TopologyBuilder {
      * Get the names of topics that are to be consumed by the source nodes created by this builder.
      * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
      */
-    public Set<String> sourceTopics(String jobId) {
+    public Set<String> sourceTopics(String applicationId) {
         Set<String> topics = new HashSet<>();
         for (String topic : sourceTopicNames) {
             if (internalTopicNames.contains(topic)) {
-                topics.add(jobId + "-" + topic);
+                topics.add(applicationId + "-" + topic);
             } else {
                 topics.add(topic);
             }


Mime
View raw message