kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [6/6] kafka git commit: KIP-28: Add a processor client for Kafka Streaming
Date Sat, 26 Sep 2015 00:24:25 GMT
KIP-28: Add a processor client for Kafka Streaming

This work has been contributed by Jesse Anderson, Randall Hauch, Yasuhiro Matsuda and Guozhang Wang. The detailed design can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client.

Author: Guozhang Wang <wangguoz@gmail.com>
Author: Yasuhiro Matsuda <yasuhiro.matsuda@gmail.com>
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Author: ymatsuda <yasuhiro.matsuda@gmail.com>
Author: Randall Hauch <rhauch@gmail.com>
Author: Jesse Anderson <jesse@smokinghand.com>
Author: Ismael Juma <ismael@juma.me.uk>
Author: Jesse Anderson <eljefe6a@gmail.com>

Reviewers: Ismael Juma, Randall Hauch, Edward Ribeiro, Gwen Shapira, Jun Rao, Jay Kreps, Yasuhiro Matsuda, Guozhang Wang

Closes #130 from guozhangwang/streaming


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/263c10ab
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/263c10ab
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/263c10ab

Branch: refs/heads/trunk
Commit: 263c10ab7c8e8fde9d3566bf59dccaa454ee2605
Parents: ad120d5
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Sep 25 17:27:58 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Sep 25 17:27:58 2015 -0700

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |   5 +
 build.gradle                                    |  76 ++-
 checkstyle/checkstyle.xml                       |   8 +-
 checkstyle/import-control.xml                   |  16 +-
 .../kafka/clients/consumer/ConsumerConfig.java  |   4 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |   2 +
 .../kafka/clients/consumer/MockConsumer.java    |  18 +-
 .../kafka/clients/producer/KafkaProducer.java   |   2 +
 .../kafka/clients/producer/ProducerConfig.java  |   4 +-
 .../kafka/common/config/AbstractConfig.java     |   4 +
 .../common/serialization/LongDeserializer.java  |  44 ++
 .../common/serialization/LongSerializer.java    |  42 ++
 .../org/apache/kafka/common/utils/Utils.java    |  41 ++
 settings.gradle                                 |   4 +-
 .../apache/kafka/streams/KafkaStreaming.java    | 125 +++++
 .../apache/kafka/streams/StreamingConfig.java   | 201 ++++++++
 .../kafka/streams/examples/KStreamJob.java      |  84 ++++
 .../kafka/streams/examples/ProcessorJob.java    | 112 +++++
 .../examples/WallclockTimestampExtractor.java   |  28 ++
 .../apache/kafka/streams/kstream/KStream.java   | 156 ++++++
 .../kafka/streams/kstream/KStreamBuilder.java   |  65 +++
 .../kafka/streams/kstream/KStreamWindowed.java  |  38 ++
 .../apache/kafka/streams/kstream/KeyValue.java  |  34 ++
 .../kafka/streams/kstream/KeyValueMapper.java   |  23 +
 .../apache/kafka/streams/kstream/Predicate.java |  24 +
 .../kafka/streams/kstream/SlidingWindowDef.java | 265 +++++++++++
 .../kafka/streams/kstream/ValueJoiner.java      |  23 +
 .../kafka/streams/kstream/ValueMapper.java      |  23 +
 .../apache/kafka/streams/kstream/Window.java    |  36 ++
 .../apache/kafka/streams/kstream/WindowDef.java |  25 +
 .../kstream/internals/FilteredIterator.java     |  63 +++
 .../kstream/internals/KStreamBranch.java        |  52 ++
 .../kstream/internals/KStreamFilter.java        |  48 ++
 .../kstream/internals/KStreamFlatMap.java       |  47 ++
 .../kstream/internals/KStreamFlatMapValues.java |  47 ++
 .../streams/kstream/internals/KStreamImpl.java  | 201 ++++++++
 .../streams/kstream/internals/KStreamJoin.java  |  96 ++++
 .../streams/kstream/internals/KStreamMap.java   |  46 ++
 .../kstream/internals/KStreamMapValues.java     |  45 ++
 .../kstream/internals/KStreamPassThrough.java   |  37 ++
 .../kstream/internals/KStreamWindow.java        |  68 +++
 .../kstream/internals/KStreamWindowedImpl.java  |  54 +++
 .../kstream/internals/WindowSupport.java        | 159 +++++++
 .../streams/processor/AbstractProcessor.java    |  71 +++
 .../kafka/streams/processor/Processor.java      |  59 +++
 .../streams/processor/ProcessorContext.java     | 106 +++++
 .../kafka/streams/processor/ProcessorDef.java   |  23 +
 .../kafka/streams/processor/RestoreFunc.java    |  27 ++
 .../kafka/streams/processor/StateStore.java     |  52 ++
 .../streams/processor/TimestampExtractor.java   |  34 ++
 .../streams/processor/TopologyBuilder.java      | 293 ++++++++++++
 .../streams/processor/TopologyException.java    |  38 ++
 .../internals/MinTimestampTracker.java          |  67 +++
 .../processor/internals/PartitionGroup.java     | 165 +++++++
 .../internals/ProcessorContextImpl.java         | 214 +++++++++
 .../processor/internals/ProcessorNode.java      |  70 +++
 .../internals/ProcessorStateManager.java        | 232 +++++++++
 .../processor/internals/ProcessorTopology.java  |  53 +++
 .../processor/internals/PunctuationQueue.java   |  56 +++
 .../internals/PunctuationSchedule.java          |  41 ++
 .../streams/processor/internals/Punctuator.java |  24 +
 .../processor/internals/RecordCollector.java    |  80 ++++
 .../processor/internals/RecordQueue.java        | 140 ++++++
 .../streams/processor/internals/SinkNode.java   |  64 +++
 .../streams/processor/internals/SourceNode.java |  64 +++
 .../streams/processor/internals/Stamped.java    |  38 ++
 .../processor/internals/StampedRecord.java      |  52 ++
 .../streams/processor/internals/StreamTask.java | 352 ++++++++++++++
 .../processor/internals/StreamThread.java       | 477 +++++++++++++++++++
 .../processor/internals/TimestampTracker.java   |  58 +++
 .../org/apache/kafka/streams/state/Entry.java   |  42 ++
 .../streams/state/InMemoryKeyValueStore.java    | 145 ++++++
 .../kafka/streams/state/KeyValueIterator.java   |  29 ++
 .../kafka/streams/state/KeyValueStore.java      |  86 ++++
 .../streams/state/MeteredKeyValueStore.java     | 273 +++++++++++
 .../kafka/streams/state/OffsetCheckpoint.java   | 172 +++++++
 .../streams/state/RocksDBKeyValueStore.java     | 276 +++++++++++
 .../streams/kstream/KStreamBuilderTest.java     |  34 ++
 .../kstream/internals/FilteredIteratorTest.java |  94 ++++
 .../kstream/internals/KStreamBranchTest.java    |  90 ++++
 .../kstream/internals/KStreamFilterTest.java    |  85 ++++
 .../kstream/internals/KStreamFlatMapTest.java   |  80 ++++
 .../internals/KStreamFlatMapValuesTest.java     |  77 +++
 .../kstream/internals/KStreamImplTest.java      | 138 ++++++
 .../kstream/internals/KStreamJoinTest.java      | 164 +++++++
 .../kstream/internals/KStreamMapTest.java       |  73 +++
 .../kstream/internals/KStreamMapValuesTest.java |  71 +++
 .../kstream/internals/KStreamWindowedTest.java  |  91 ++++
 .../streams/processor/TopologyBuilderTest.java  |  99 ++++
 .../internals/MinTimestampTrackerTest.java      |  93 ++++
 .../processor/internals/PartitionGroupTest.java | 102 ++++
 .../internals/ProcessorStateManagerTest.java    | 449 +++++++++++++++++
 .../internals/ProcessorTopologyTest.java        | 326 +++++++++++++
 .../internals/PunctuationQueueTest.java         |  85 ++++
 .../processor/internals/RecordQueueTest.java    | 116 +++++
 .../processor/internals/StreamTaskTest.java     | 186 ++++++++
 .../processor/internals/StreamThreadTest.java   | 389 +++++++++++++++
 .../apache/kafka/test/KStreamTestDriver.java    |  95 ++++
 .../apache/kafka/test/MockProcessorContext.java | 143 ++++++
 .../org/apache/kafka/test/MockProcessorDef.java |  58 +++
 .../org/apache/kafka/test/MockSourceNode.java   |  46 ++
 .../kafka/test/MockTimestampExtractor.java      |  30 ++
 .../kafka/test/ProcessorTopologyTestDriver.java | 317 ++++++++++++
 .../apache/kafka/test/UnlimitedWindowDef.java   | 104 ++++
 104 files changed, 10083 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index dd37df4..d8a111e 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -56,6 +56,11 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
+for file in $base_dir/stream/build/libs/kafka-streams*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
 for file in $base_dir/tools/build/libs/kafka-tools*.jar;
 do
   CLASSPATH=$CLASSPATH:$file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index fecc3eb..02b1db5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -215,25 +215,22 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
 }
 
 def copycatPkgs = ['copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file']
-def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools'] + copycatPkgs
+def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools', 'streams'] + copycatPkgs
 
 tasks.create(name: "jarCopycat", dependsOn: copycatPkgs.collect { it + ":jar" }) {}
-tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) {
-}
+tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) { }
 
 tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7'] + pkgs.collect { it + ":srcJar" }) { }
 
 tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7'] + pkgs.collect { it + ":docsJar" }) { }
 
 tasks.create(name: "testCopycat", dependsOn: copycatPkgs.collect { it + ":test" }) {}
-tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7'] + pkgs.collect { it + ":test" }) {
-}
+tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7'] + pkgs.collect { it + ":test" }) { }
 
 tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) {
 }
 
-tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7'] + pkgs.collect { it + ":uploadArchives" }) {
-}
+tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7'] + pkgs.collect { it + ":uploadArchives" }) { }
 
 project(':core') {
   println "Building project 'core' with Scala version $scalaVersion"
@@ -518,6 +515,71 @@ project(':tools') {
         dependsOn 'copyDependantLibs'
     }
 
+    artifacts {
+        archives testJar
+    }
+
+    configurations {
+        archives.extendsFrom (testCompile)
+    }
+
+    checkstyle {
+        configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+    }
+    test.dependsOn('checkstyleMain', 'checkstyleTest')
+}
+
+project(':streams') {
+    apply plugin: 'checkstyle'
+    archivesBaseName = "kafka-streams"
+
+    dependencies {
+        compile project(':clients')
+        compile "$slf4jlog4j"
+        compile 'org.rocksdb:rocksdbjni:3.10.1'
+
+        testCompile "$junit"
+        testCompile project(path: ':clients', configuration: 'archives')
+    }
+
+    task testJar(type: Jar) {
+        classifier = 'test'
+        from sourceSets.test.output
+    }
+
+    test {
+        testLogging {
+            events "passed", "skipped", "failed"
+            exceptionFormat = 'full'
+        }
+    }
+
+    javadoc {
+        include "**/org/apache/kafka/streams/*"
+    }
+
+    tasks.create(name: "copyDependantLibs", type: Copy) {
+        from (configurations.testRuntime) {
+            include('slf4j-log4j12*')
+        }
+        from (configurations.runtime) {
+            exclude('kafka-clients*')
+        }
+        into "$buildDir/dependant-libs-${scalaVersion}"
+    }
+
+    jar {
+        dependsOn 'copyDependantLibs'
+    }
+
+    artifacts {
+      archives testJar
+    }
+
+    configurations {
+      archives.extendsFrom (testCompile)
+    }
+
     checkstyle {
         configFile = new File(rootDir, "checkstyle/checkstyle.xml")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index a215ff3..999fd6c 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -53,9 +53,13 @@
     </module>
     <module name="LocalVariableName"/>
     <module name="LocalFinalVariableName"/>
-    <module name="ClassTypeParameterName"/>
     <module name="MemberName"/>
-    <module name="MethodTypeParameterName"/>
+    <module name="ClassTypeParameterName">
+      <property name="format" value="^[A-Z0-9]*$"/>
+    </module>
+    <module name="MethodTypeParameterName">
+      <property name="format" value="^[A-Z0-9]*$"/>
+    </module>
     <module name="PackageName"/>
     <module name="ParameterName"/>
     <module name="StaticVariableName"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index d58c472..7b748ec 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -90,8 +90,8 @@
 	</subpackage>
 
 	<subpackage name="clients">
-		<allow pkg="org.apache.kafka.common" />
 		<allow pkg="org.slf4j" />
+		<allow pkg="org.apache.kafka.common" />
 		<allow pkg="org.apache.kafka.clients" exact-match="true"/>
 		<allow pkg="org.apache.kafka.test" />
 
@@ -111,6 +111,20 @@
 		</subpackage>
 	</subpackage>
 
+	<subpackage name="streams">
+		<allow pkg="org.apache.kafka.common"/>
+		<allow pkg="org.apache.kafka.test"/>
+		<allow pkg="org.apache.kafka.clients"/>
+		<allow pkg="org.apache.kafka.clients.producer" exact-match="true"/>
+		<allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/>
+
+		<allow pkg="org.apache.kafka.streams"/>
+
+		<subpackage name="state">
+			<allow pkg="org.rocksdb" />
+		</subpackage>
+	</subpackage>
+
 	<subpackage name="log4jappender">
 		<allow pkg="org.apache.log4j" />
 		<allow pkg="org.apache.kafka.clients" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index b9a2d4e..347a5bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -150,11 +150,11 @@ public class ConsumerConfig extends AbstractConfig {
 
     /** <code>key.deserializer</code> */
     public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
-    private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
+    public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
 
     /** <code>value.deserializer</code> */
     public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
-    private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
+    public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
 
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2a3c763..68f61bf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -550,6 +550,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                         Deserializer.class);
                 this.keyDeserializer.configure(config.originals(), true);
             } else {
+                config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
                 this.keyDeserializer = keyDeserializer;
             }
             if (valueDeserializer == null) {
@@ -557,6 +558,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                         Deserializer.class);
                 this.valueDeserializer.configure(config.originals(), false);
             } else {
+                config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                 this.valueDeserializer = valueDeserializer;
             }
             this.fetcher = new Fetcher<K, V>(this.client,

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 1f802a8..3c0f261 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -46,6 +46,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private final Map<String, List<PartitionInfo>> partitions;
     private final SubscriptionState subscriptions;
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private Set<TopicPartition> paused;
     private boolean closed;
     private final Map<TopicPartition, Long> beginningOffsets;
     private final Map<TopicPartition, Long> endOffsets;
@@ -57,8 +58,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
         this.subscriptions = new SubscriptionState(offsetResetStrategy);
-        this.partitions = new HashMap<String, List<PartitionInfo>>();
-        this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+        this.partitions = new HashMap<>();
+        this.records = new HashMap<>();
+        this.paused = new HashSet<>();
         this.closed = false;
         this.beginningOffsets = new HashMap<>();
         this.endOffsets = new HashMap<>();
@@ -288,14 +290,18 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     @Override
     public void pause(TopicPartition... partitions) {
-        for (TopicPartition partition : partitions)
+        for (TopicPartition partition : partitions) {
             subscriptions.pause(partition);
+            paused.add(partition);
+        }
     }
 
     @Override
     public void resume(TopicPartition... partitions) {
-        for (TopicPartition partition : partitions)
+        for (TopicPartition partition : partitions) {
             subscriptions.resume(partition);
+            paused.remove(partition);
+        }
     }
 
     @Override
@@ -332,6 +338,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
+    public Set<TopicPartition> paused() {
+        return Collections.unmodifiableSet(new HashSet<>(paused));
+    }
+
     private void ensureNotClosed() {
         if (this.closed)
             throw new IllegalStateException("This consumer has already been closed.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 804d569..3a783ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -259,6 +259,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                         Serializer.class);
                 this.keySerializer.configure(config.originals(), true);
             } else {
+                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                 this.keySerializer = keySerializer;
             }
             if (valueSerializer == null) {
@@ -266,6 +267,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                         Serializer.class);
                 this.valueSerializer.configure(config.originals(), false);
             } else {
+                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                 this.valueSerializer = valueSerializer;
             }
             config.logUnused();

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 06f00a9..6969f61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -164,11 +164,11 @@ public class ProducerConfig extends AbstractConfig {
 
     /** <code>key.serializer</code> */
     public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
-    private static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface.";
+    public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface.";
 
     /** <code>value.serializer</code> */
     public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
-    private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
+    public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
 
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 12a1927..2961e09 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -62,6 +62,10 @@ public class AbstractConfig {
         return values.get(key);
     }
 
+    public void ignore(String key) {
+        used.add(key);
+    }
+
     public Short getShort(String key) {
         return (Short) get(key);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
new file mode 100644
index 0000000..37983e4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.util.Map;
+
+public class LongDeserializer implements Deserializer<Long> {
+
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    public Long deserialize(String topic, byte[] data) {
+        if (data == null)
+            return null;
+        if (data.length != 8) {
+            throw new SerializationException("Size of data received by LongDeserializer is " +
+                    "not 8");
+        }
+
+        long value = 0;
+        for (byte b : data) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return value;
+    }
+
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
new file mode 100644
index 0000000..3100529
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+public class LongSerializer implements Serializer<Long> {
+
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    public byte[] serialize(String topic, Long data) {
+        if (data == null)
+            return null;
+
+        return new byte[] {
+            (byte) (data >>> 56),
+            (byte) (data >>> 48),
+            (byte) (data >>> 40),
+            (byte) (data >>> 32),
+            (byte) (data >>> 24),
+            (byte) (data >>> 16),
+            (byte) (data >>> 8),
+            data.byteValue()
+        };
+    }
+
+    public void close() {
+        // nothing to do
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index c58b741..fa7c92f 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -25,7 +25,10 @@ import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.Properties;
@@ -524,4 +527,42 @@ public class Utils {
         return existingBuffer;
     }
 
+    /*
+     * Creates a set
+     * @param elems the elements
+     * @param <T> the type of element
+     * @return Set
+     */
+    public static <T> HashSet<T> mkSet(T... elems) {
+        return new HashSet<>(Arrays.asList(elems));
+    }
+    
+    /**
+     * Recursively delete the given file/directory and any subfiles (if any exist)
+     *
+     * @param file The root file at which to begin deleting
+     */
+    public static void delete(File file) {
+        if (file == null) {
+            return;
+        } else if (file.isDirectory()) {
+            File[] files = file.listFiles();
+            if (files != null) {
+                for (File f : files)
+                    delete(f);
+            }
+            file.delete();
+        } else {
+            file.delete();
+        }
+    }
+
+    /**
+     * Returns an empty collection if this list is null
+     * @param other
+     * @return
+     */
+    public static <T> List<T> safe(List<T> other) {
+        return other == null ? Collections.<T>emptyList() : other;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 9c7fea5..357305b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,5 +14,5 @@
 // limitations under the License.
 
 apply from: file('scala.gradle')
-include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender',
-        'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file'
+include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
+        'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
new file mode 100644
index 0000000..f3a99e0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and
+ * sends output to zero or more output topics.
+ * <p>
+ * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
+ * the transformation.
+ * The {@link KafkaStreaming} instance will be responsible for the lifecycle of these processors. It will instantiate and
+ * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
+ * <p>
+ * This streaming instance will co-ordinate with any other instances (whether in this same process, on other processes
+ * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
+ * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
+ * started in the appropriate processes to balance processing load.
+ * <p>
+ * Internally the {@link KafkaStreaming} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
+ * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
+ * <p>
+ * A simple example might look like this:
+ * <pre>
+ *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
+ *    props.put("bootstrap.servers", "localhost:4242");
+ *    props.put("key.deserializer", StringDeserializer.class);
+ *    props.put("value.deserializer", StringDeserializer.class);
+ *    props.put("key.serializer", StringSerializer.class);
+ *    props.put("value.serializer", IntegerSerializer.class);
+ *    props.put("timestamp.extractor", MyTimestampExtractor.class);
+ *    StreamingConfig config = new StreamingConfig(props);
+ *
+ *    KStreamBuilder builder = new KStreamBuilder();
+ *    builder.from("topic1").mapValue(value -&gt; value.length()).to("topic2");
+ *
+ *    KafkaStreaming streaming = new KafkaStreaming(builder, config);
+ *    streaming.start();
+ * </pre>
+ *
+ */
+public class KafkaStreaming {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
+
+    // Container States
+    private static final int CREATED = 0;
+    private static final int RUNNING = 1;
+    private static final int STOPPED = 2;
+    private int state = CREATED;
+
+    private final StreamThread[] threads;
+
+    public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
+        this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
+        for (int i = 0; i < this.threads.length; i++) {
+            this.threads[i] = new StreamThread(builder, config);
+        }
+    }
+
+    /**
+     * Start the stream process by starting all its threads
+     */
+    public synchronized void start() {
+        log.debug("Starting Kafka Stream process");
+
+        if (state == CREATED) {
+            for (StreamThread thread : threads)
+                thread.start();
+
+            state = RUNNING;
+
+            log.info("Started Kafka Stream process");
+        } else {
+            throw new IllegalStateException("This process was already started.");
+        }
+    }
+
+    /**
+     * Shutdown this stream process by signaling the threads to stop,
+     * wait for them to join and clean up the process instance.
+     */
+    public synchronized void close() {
+        log.debug("Stopping Kafka Stream process");
+
+        if (state == RUNNING) {
+            // signal the threads to stop and wait
+            for (StreamThread thread : threads)
+                thread.close();
+
+            for (StreamThread thread : threads) {
+                try {
+                    thread.join();
+                } catch (InterruptedException ex) {
+                    Thread.interrupted();
+                }
+            }
+
+            state = STOPPED;
+
+            log.info("Stopped Kafka Stream process");
+        } else {
+            throw new IllegalStateException("This process has not started yet.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
new file mode 100644
index 0000000..dce69b6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.Map;
+
+public class StreamingConfig extends AbstractConfig {
+
+    private static final ConfigDef CONFIG;
+
+    /** <code>state.dir</code> */
+    public static final String STATE_DIR_CONFIG = "state.dir";
+    private static final String STATE_DIR_DOC = "Directory location for state store.";
+
+    /** <code>commit.interval.ms</code> */
+    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
+    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
+
+    /** <code>poll.ms</code> */
+    public static final String POLL_MS_CONFIG = "poll.ms";
+    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
+
+    /** <code>num.stream.threads</code> */
+    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
+    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
+
+    /** <code>buffered.records.per.partition</code> */
+    public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
+    private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
+
+    /** <code>state.cleanup.delay</code> */
+    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
+    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
+
+    /** <code>total.records.to.process</code> */
+    public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
+    private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
+
+    /** <code>window.time.ms</code> */
+    public static final String WINDOW_TIME_MS_CONFIG = "window.time.ms";
+    private static final String WINDOW_TIME_MS_DOC = "Setting this to a non-negative value will cause the processor to get called "
+                                                     + "with this frequency even if there is no message.";
+
+    /** <code>timestamp.extractor</code> */
+    public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
+    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
+
+    /** <code>client.id</code> */
+    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+
+    /** <code>key.serializer</code> */
+    public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+
+    /** <code>value.serializer</code> */
+    public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+    /** <code>key.deserializer</code> */
+    public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+
+    /** <code>value.deserializer</code> */
+    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+    /**
+     * <code>bootstrap.servers</code>
+     */
+    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+    private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
+
+    static {
+        CONFIG = new ConfigDef().define(CLIENT_ID_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CLIENT_ID_DOC)
+                                .define(STATE_DIR_CONFIG,
+                                        Type.STRING,
+                                        SYSTEM_TEMP_DIRECTORY,
+                                        Importance.MEDIUM,
+                                        STATE_DIR_DOC)
+                                .define(COMMIT_INTERVAL_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        Importance.HIGH,
+                                        COMMIT_INTERVAL_MS_DOC)
+                                .define(POLL_MS_CONFIG,
+                                        Type.LONG,
+                                        100,
+                                        Importance.LOW,
+                                        POLL_MS_DOC)
+                                .define(NUM_STREAM_THREADS_CONFIG,
+                                        Type.INT,
+                                        1,
+                                        Importance.LOW,
+                                        NUM_STREAM_THREADS_DOC)
+                                .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+                                        Type.INT,
+                                        1000,
+                                        Importance.LOW,
+                                        BUFFERED_RECORDS_PER_PARTITION_DOC)
+                                .define(STATE_CLEANUP_DELAY_MS_CONFIG,
+                                        Type.LONG,
+                                        60000,
+                                        Importance.LOW,
+                                        STATE_CLEANUP_DELAY_MS_DOC)
+                                .define(TOTAL_RECORDS_TO_PROCESS,
+                                        Type.LONG,
+                                        -1L,
+                                        Importance.LOW,
+                                        TOTAL_RECORDS_TO_DOC)
+                                .define(WINDOW_TIME_MS_CONFIG,
+                                        Type.LONG,
+                                        -1L,
+                                        Importance.MEDIUM,
+                                        WINDOW_TIME_MS_DOC)
+                                .define(KEY_SERIALIZER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
+                                .define(VALUE_SERIALIZER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
+                                .define(KEY_DESERIALIZER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
+                                .define(VALUE_DESERIALIZER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
+                                .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        TIMESTAMP_EXTRACTOR_CLASS_DOC)
+                                .define(BOOTSTRAP_SERVERS_CONFIG,
+                                        Type.STRING,
+                                        Importance.HIGH,
+                                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC);
+    }
+
+    public StreamingConfig(Map<?, ?> props) {
+        super(CONFIG, props);
+    }
+
+    public Map<String, Object> getConsumerConfigs() {
+        Map<String, Object> props = this.originals();
+
+        // set consumer default property values
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range");
+
+        // remove properties that are not required for consumers
+        props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
+        props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+        props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+
+        return props;
+    }
+
+    public Map<String, Object> getProducerConfigs() {
+        Map<String, Object> props = this.originals();
+
+        // set producer default property values
+        props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+
+        // remove properties that are not required for producers
+        props.remove(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+        props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+
+        return props;
+    }
+
+    public static void main(String[] args) {
+        System.out.println(CONFIG.toHtmlTable());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
new file mode 100644
index 0000000..feb4ee7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.KafkaStreaming;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Predicate;
+
+import java.util.Properties;
+
+public class KStreamJob {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-KStream-Job");
+        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        StreamingConfig config = new StreamingConfig(props);
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, String> stream1 = builder.from("topic1");
+
+        KStream<String, Integer> stream2 =
+            stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() {
+                @Override
+                public KeyValue<String, Integer> apply(String key, String value) {
+                    return new KeyValue<>(key, new Integer(value));
+                }
+            }).filter(new Predicate<String, Integer>() {
+                @Override
+                public boolean apply(String key, Integer value) {
+                    return true;
+                }
+            });
+
+        KStream<String, Integer>[] streams = stream2.branch(
+            new Predicate<String, Integer>() {
+                @Override
+                public boolean apply(String key, Integer value) {
+                    return (value % 2) == 0;
+                }
+            },
+            new Predicate<String, Integer>() {
+                @Override
+                public boolean apply(String key, Integer value) {
+                    return true;
+                }
+            }
+        );
+
+        streams[0].to("topic2");
+        streams[1].to("topic3");
+
+        KafkaStreaming kstream = new KafkaStreaming(builder, config);
+        kstream.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
new file mode 100644
index 0000000..0b3aba8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreaming;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Properties;
+
+public class ProcessorJob {
+
+    private static class MyProcessorDef implements ProcessorDef {
+
+        @Override
+        public Processor<String, String> instance() {
+            return new Processor<String, String>() {
+                private ProcessorContext context;
+                private KeyValueStore<String, Integer> kvStore;
+
+                @Override
+                public void init(ProcessorContext context) {
+                    this.context = context;
+                    this.context.schedule(1000);
+                    this.kvStore = new InMemoryKeyValueStore<>("local-state", context);
+                }
+
+                @Override
+                public void process(String key, String value) {
+                    Integer oldValue = this.kvStore.get(key);
+                    Integer newValue = Integer.parseInt(value);
+                    if (oldValue == null) {
+                        this.kvStore.put(key, newValue);
+                    } else {
+                        this.kvStore.put(key, oldValue + newValue);
+                    }
+
+                    context.commit();
+                }
+
+                @Override
+                public void punctuate(long timestamp) {
+                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
+
+                    while (iter.hasNext()) {
+                        Entry<String, Integer> entry = iter.next();
+
+                        System.out.println("[" + entry.key() + ", " + entry.value() + "]");
+
+                        context.forward(entry.key(), entry.value());
+                    }
+
+                    iter.close();
+                }
+
+                @Override
+                public void close() {
+                    this.kvStore.close();
+                }
+            };
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
+        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        StreamingConfig config = new StreamingConfig(props);
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
+
+        builder.addProcessor("PROCESS", new MyProcessorDef(), "SOURCE");
+
+        builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
+
+        KafkaStreaming streaming = new KafkaStreaming(builder, config);
+        streaming.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
new file mode 100644
index 0000000..26281d6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+public class WallclockTimestampExtractor implements TimestampExtractor {
+    @Override
+    public long extract(ConsumerRecord<Object, Object> record) {
+        return System.currentTimeMillis();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
new file mode 100644
index 0000000..7f101ab
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+/**
+ * KStream is an abstraction of a stream of key-value pairs.
+ */
+public interface KStream<K, V> {
+
+    /**
+     * Creates a new stream consists of all elements of this stream which satisfy a predicate
+     *
+     * @param predicate the instance of Predicate
+     * @return KStream
+     */
+    KStream<K, V> filter(Predicate<K, V> predicate);
+
+    /**
+     * Creates a new stream consists all elements of this stream which do not satisfy a predicate
+     *
+     * @param predicate the instance of Predicate
+     * @return KStream
+     */
+    KStream<K, V> filterOut(Predicate<K, V> predicate);
+
+    /**
+     * Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream
+     *
+     * @param mapper the instance of KeyValueMapper
+     * @param <K1>   the key type of the new stream
+     * @param <V1>   the value type of the new stream
+     * @return KStream
+     */
+    <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
+
+    /**
+     * Creates a new stream by transforming values by a mapper to all values of this stream
+     *
+     * @param mapper the instance of ValueMapper
+     * @param <V1>   the value type of the new stream
+     * @return KStream
+     */
+    <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
+
+    /**
+     * Creates a new stream by applying a mapper to all elements of this stream and using the values in the resulting Iterable
+     *
+     * @param mapper the instance of KeyValueMapper
+     * @param <K1>   the key type of the new stream
+     * @param <V1>   the value type of the new stream
+     * @return KStream
+     */
+    <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
+
+    /**
+     * Creates a new stream by applying a mapper to all values of this stream and using the values in the resulting Iterable
+     *
+     * @param processor the instance of Processor
+     * @param <V1>      the value type of the new stream
+     * @return KStream
+     */
+    <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
+
+    /**
+     * Creates a new windowed stream using a specified window instance.
+     *
+     * @param windowDef the instance of Window
+     * @return KStream
+     */
+    KStreamWindowed<K, V> with(WindowDef<K, V> windowDef);
+
+    /**
+     * Creates an array of streams from this stream. Each stream in the array coresponds to a predicate in
+     * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to
+     * a corresponding stream for the first predicate is evaluated true.
+     * An element will be dropped if none of the predicates evaluate true.
+     *
+     * @param predicates Instances of Predicate
+     * @return KStream
+     */
+    KStream<K, V>[] branch(Predicate<K, V>... predicates);
+
+    /**
+     * Sends key-value to a topic, also creates a new stream from the topic.
+     * This is equivalent to calling to(topic) and from(topic).
+     *
+     * @param topic           the topic name
+     * @param <K1>            the key type of the new stream
+     * @param <V1>            the value type of the new stream
+     * @return KStream
+     */
+    <K1, V1> KStream<K1, V1> through(String topic);
+
+    /**
+     * Sends key-value to a topic, also creates a new stream from the topic.
+     * This is equivalent to calling to(topic) and from(topic).
+     *
+     * @param topic           the topic name
+     * @param keySerializer   key serializer used to send key-value pairs,
+     *                        if not specified the default serializer defined in the configs will be used
+     * @param valSerializer   value serializer used to send key-value pairs,
+     *                        if not specified the default serializer defined in the configs will be used
+     * @param keyDeserializer key deserializer used to create the new KStream,
+     *                        if not specified the default deserializer defined in the configs will be used
+     * @param valDeserializer value deserializer used to create the new KStream,
+     *                        if not specified the default deserializer defined in the configs will be used
+     * @param <K1>            the key type of the new stream
+     * @param <V1>            the value type of the new stream
+     * @return KStream
+     */
+    <K1, V1> KStream<K1, V1> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K1> keyDeserializer, Deserializer<V1> valDeserializer);
+
+    /**
+     * Sends key-value to a topic using default serializers specified in the config.
+     *
+     * @param topic         the topic name
+     */
+    void to(String topic);
+
+    /**
+     * Sends key-value to a topic.
+     *
+     * @param topic         the topic name
+     * @param keySerializer key serializer used to send key-value pairs,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param valSerializer value serializer used to send key-value pairs,
+     *                      if not specified the default serializer defined in the configs will be used
+     */
+    void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
+
+    /**
+     * Processes all elements in this stream by applying a processor.
+     *
+     * @param processorDef the class of ProcessorDef
+     */
+    <K1, V1> KStream<K1, V1> process(ProcessorDef processorDef);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
new file mode 100644
index 0000000..2d4dcc7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.kstream.internals.KStreamImpl;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+
+/**
+ * KStreamBuilder is the class to create KStream instances.
+ */
+public class KStreamBuilder extends TopologyBuilder {
+
+    public KStreamBuilder() {
+        super();
+    }
+
+    /**
+     * Creates a KStream instance for the specified topic. The stream is added to the default synchronization group.
+     * The default deserializers specified in the config are used.
+     *
+     * @param topics          the topic names, if empty default to all the topics in the config
+     * @return KStream
+     */
+    public <K, V> KStream<K, V> from(String... topics) {
+        String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement();
+
+        addSource(name, topics);
+
+        return new KStreamImpl<>(this, name);
+    }
+
+    /**
+     * Creates a KStream instance for the specified topic. The stream is added to the default synchronization group.
+     *
+     * @param keyDeserializer key deserializer used to read this source KStream,
+     *                        if not specified the default deserializer defined in the configs will be used
+     * @param valDeserializer value deserializer used to read this source KStream,
+     *                        if not specified the default deserializer defined in the configs will be used
+     * @param topics          the topic names, if empty default to all the topics in the config
+     * @return KStream
+     */
+    public <K, V> KStream<K, V> from(Deserializer<? extends K> keyDeserializer, Deserializer<? extends V> valDeserializer, String... topics) {
+        String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement();
+
+        addSource(name, keyDeserializer, valDeserializer, topics);
+
+        return new KStreamImpl<>(this, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
new file mode 100644
index 0000000..4d73128
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+/**
+ * KStreamWindowed is an abstraction of a stream of key-value pairs with a window.
+ */
+public interface KStreamWindowed<K, V> extends KStream<K, V> {
+
+    /**
+     * Creates a new stream by joining this windowed stream with the other windowed stream.
+     * Each element arrived from either of the streams is joined with elements in a window of each other.
+     * The resulting values are computed by applying a joiner.
+     *
+     * @param other  the other windowed stream
+     * @param joiner ValueJoiner
+     * @param <V1>   the value type of the other stream
+     * @param <V2>   the value type of the new stream
+     * @return KStream
+     */
+    <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
new file mode 100644
index 0000000..f633f6e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public class KeyValue<K, V> {
+
+    public final K key;
+    public final V value;
+
+    public KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public static <K, V> KeyValue<K, V> pair(K key, V value) {
+        return new KeyValue<>(key, value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
new file mode 100644
index 0000000..62b07f6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface KeyValueMapper<K, V, R> {
+
+    R apply(K key, V value);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
new file mode 100644
index 0000000..9cdb3bc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface Predicate<K, V> {
+
+    boolean apply(K key, V value);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
new file mode 100644
index 0000000..cc03541
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.FilteredIterator;
+import org.apache.kafka.streams.kstream.internals.WindowSupport;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.internals.Stamped;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+public class SlidingWindowDef<K, V> implements WindowDef<K, V> {
+    private final String name;
+    private final long duration;
+    private final int maxCount;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+
+    public SlidingWindowDef(
+            String name,
+            long duration,
+            int maxCount,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
+            Deserializer<K> keyDeseriaizer,
+            Deserializer<V> valueDeserializer) {
+        this.name = name;
+        this.duration = duration;
+        this.maxCount = maxCount;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.keyDeserializer = keyDeseriaizer;
+        this.valueDeserializer = valueDeserializer;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Window<K, V> instance() {
+        return new SlidingWindow();
+    }
+
+    public class SlidingWindow extends WindowSupport implements Window<K, V> {
+        private final Object lock = new Object();
+        private ProcessorContext context;
+        private int slotNum; // used as a key for Kafka log compaction
+        private LinkedList<K> list = new LinkedList<K>();
+        private HashMap<K, ValueList<V>> map = new HashMap<>();
+
+        @Override
+        public void init(ProcessorContext context) {
+            this.context = context;
+            RestoreFuncImpl restoreFunc = new RestoreFuncImpl();
+            context.register(this, restoreFunc);
+
+            for (ValueList<V> valueList : map.values()) {
+                valueList.clearDirtyValues();
+            }
+            this.slotNum = restoreFunc.slotNum;
+        }
+
+        @Override
+        public Iterator<V> findAfter(K key, final long timestamp) {
+            return find(key, timestamp, timestamp + duration);
+        }
+
+        @Override
+        public Iterator<V> findBefore(K key, final long timestamp) {
+            return find(key, timestamp - duration, timestamp);
+        }
+
+        @Override
+        public Iterator<V> find(K key, final long timestamp) {
+            return find(key, timestamp - duration, timestamp + duration);
+        }
+
+        /*
+         * finds items in the window between startTime and endTime (both inclusive)
+         */
+        private Iterator<V> find(K key, final long startTime, final long endTime) {
+            final ValueList<V> values = map.get(key);
+
+            if (values == null) {
+                return Collections.emptyIterator();
+            } else {
+                return new FilteredIterator<V, Value<V>>(values.iterator()) {
+                    @Override
+                    protected V filter(Value<V> item) {
+                        if (startTime <= item.timestamp && item.timestamp <= endTime)
+                            return item.value;
+                        else
+                            return null;
+                    }
+                };
+            }
+        }
+
+        @Override
+        public void put(K key, V value, long timestamp) {
+            synchronized (lock) {
+                slotNum++;
+
+                list.offerLast(key);
+
+                ValueList<V> values = map.get(key);
+                if (values == null) {
+                    values = new ValueList<>();
+                    map.put(key, values);
+                }
+
+                values.add(slotNum, value, timestamp);
+            }
+            evictExcess();
+            evictExpired(timestamp - duration);
+        }
+
+        private void evictExcess() {
+            while (list.size() > maxCount) {
+                K oldestKey = list.pollFirst();
+
+                ValueList<V> values = map.get(oldestKey);
+                values.removeFirst();
+
+                if (values.isEmpty()) map.remove(oldestKey);
+            }
+        }
+
+        private void evictExpired(long cutoffTime) {
+            while (true) {
+                K oldestKey = list.peekFirst();
+
+                ValueList<V> values = map.get(oldestKey);
+                Stamped<V> oldestValue = values.first();
+
+                if (oldestValue.timestamp < cutoffTime) {
+                    list.pollFirst();
+                    values.removeFirst();
+
+                    if (values.isEmpty()) map.remove(oldestKey);
+                } else {
+                    break;
+                }
+            }
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
+
+        @Override
+        public void flush() {
+            IntegerSerializer intSerializer = new IntegerSerializer();
+            ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
+
+            RecordCollector collector = ((ProcessorContextImpl) context).recordCollector();
+
+            for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) {
+                ValueList<V> values = entry.getValue();
+                if (values.hasDirtyValues()) {
+                    K key = entry.getKey();
+
+                    byte[] keyBytes = keySerializer.serialize(name, key);
+
+                    Iterator<Value<V>> iterator = values.dirtyValueIterator();
+                    while (iterator.hasNext()) {
+                        Value<V> dirtyValue = iterator.next();
+                        byte[] slot = intSerializer.serialize("", dirtyValue.slotNum);
+                        byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value);
+
+                        byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length];
+
+                        int offset = 0;
+                        offset += putLong(combined, offset, dirtyValue.timestamp);
+                        offset += puts(combined, offset, keyBytes);
+                        offset += puts(combined, offset, valBytes);
+
+                        if (offset != combined.length)
+                            throw new IllegalStateException("serialized length does not match");
+
+                        collector.send(new ProducerRecord<>(name, context.id(), slot, combined), byteArraySerializer, byteArraySerializer);
+                    }
+                    values.clearDirtyValues();
+                }
+            }
+        }
+
+        @Override
+        public void close() {
+            // TODO
+        }
+
+        @Override
+        public boolean persistent() {
+            // TODO: should not be persistent, right?
+            return false;
+        }
+
+        private class RestoreFuncImpl implements RestoreFunc {
+
+            final IntegerDeserializer intDeserializer;
+            int slotNum = 0;
+
+            RestoreFuncImpl() {
+                intDeserializer = new IntegerDeserializer();
+            }
+
+            @Override
+            public void apply(byte[] slot, byte[] bytes) {
+
+                slotNum = intDeserializer.deserialize("", slot);
+
+                int offset = 0;
+                // timestamp
+                long timestamp = getLong(bytes, offset);
+                offset += 8;
+                // key
+                int length = getInt(bytes, offset);
+                offset += 4;
+                K key = deserialize(bytes, offset, length, name, keyDeserializer);
+                offset += length;
+                // value
+                length = getInt(bytes, offset);
+                offset += 4;
+                V value = deserialize(bytes, offset, length, name, valueDeserializer);
+
+                put(key, value, timestamp);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
new file mode 100644
index 0000000..93fc359
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface ValueJoiner<V1, V2, R> {
+
+    R apply(V1 value1, V2 value2);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
new file mode 100644
index 0000000..a32423d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface ValueMapper<V1, V2> {
+
+    V2 apply(V1 value);
+}


Mime
View raw message