kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-2600: Align Kafka Streams' interfaces with Java 8 functional interfaces
Date Fri, 09 Oct 2015 19:14:47 GMT
KAFKA-2600: Align Kafka Streams' interfaces with Java 8 functional interfaces

A few of Kafka Stream's interfaces and classes are not as well-aligned with Java 8's functional interfaces. By making these changes, when Kafka moves to Java 8 these classes can extend standard Java 8 functional interfaces while remaining backward compatible. This will make it easier for developers to use Kafka Streams, and may allow us to eventually remove these custom interfaces and just use the standard Java 8 interfaces.

The changes include:

1. The 'apply' method of KStream's `Predicate` functional interface was renamed to `test` to match the method name on `java.util.function.BiPredicate`. This will allow KStream's `Predicate` to extend `BiPredicate` when Kafka moves to Java 8, and for the `KStream.filter` and `filterOut` methods to accept `BiPredicate`.
2. Renamed the `ProcessorDef` and `WindowDef` interfaces to `ProcessorSupplier` and `WindowSupplier`, respectively. Also the `SlidingWindowDef` class was renamed to `SlidingWindowSupplier`, and the `MockProcessorDef` test class was renamed to `MockProcessorSupplier`. The `instance()` method in all were renamed to `get()`, so that all of these can extend/implement Java 8's `java.util.function.Supplier<T>` interface in the future with no other changes and while remaining backward compatible. Variable names that used some form of "def" were changed to use "supplier".

These two sets of changes were made in separate commits.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #270 from rhauch/kafka-2600


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7233858b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7233858b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7233858b

Branch: refs/heads/trunk
Commit: 7233858bea1c548153ed00b7edffd55299db7cf2
Parents: 49822ff
Author: Randall Hauch <rhauch@gmail.com>
Authored: Fri Oct 9 12:19:07 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Oct 9 12:19:07 2015 -0700

----------------------------------------------------------------------
 .../kafka/streams/examples/KStreamJob.java      |   6 +-
 .../kafka/streams/examples/ProcessorJob.java    |   8 +-
 .../apache/kafka/streams/kstream/KStream.java   |  52 ++--
 .../apache/kafka/streams/kstream/Predicate.java |   8 +-
 .../kafka/streams/kstream/SlidingWindowDef.java | 265 -------------------
 .../streams/kstream/SlidingWindowSupplier.java  | 265 +++++++++++++++++++
 .../apache/kafka/streams/kstream/WindowDef.java |  25 --
 .../kafka/streams/kstream/WindowSupplier.java   |  25 ++
 .../kstream/internals/KStreamBranch.java        |  10 +-
 .../kstream/internals/KStreamFilter.java        |   8 +-
 .../kstream/internals/KStreamFlatMap.java       |   6 +-
 .../kstream/internals/KStreamFlatMapValues.java |   6 +-
 .../streams/kstream/internals/KStreamImpl.java  |  20 +-
 .../streams/kstream/internals/KStreamJoin.java  |   7 +-
 .../streams/kstream/internals/KStreamMap.java   |   6 +-
 .../kstream/internals/KStreamMapValues.java     |   8 +-
 .../kstream/internals/KStreamPassThrough.java   |   8 +-
 .../kstream/internals/KStreamWindow.java        |  20 +-
 .../kstream/internals/KStreamWindowedImpl.java  |  12 +-
 .../kafka/streams/processor/ProcessorDef.java   |  23 --
 .../streams/processor/ProcessorSupplier.java    |  23 ++
 .../streams/processor/TopologyBuilder.java      |  19 +-
 .../kstream/internals/KStreamBranchTest.java    |  14 +-
 .../kstream/internals/KStreamFilterTest.java    |  12 +-
 .../kstream/internals/KStreamFlatMapTest.java   |   6 +-
 .../internals/KStreamFlatMapValuesTest.java     |   6 +-
 .../kstream/internals/KStreamImplTest.java      |  16 +-
 .../kstream/internals/KStreamJoinTest.java      |   6 +-
 .../kstream/internals/KStreamMapTest.java       |   6 +-
 .../kstream/internals/KStreamMapValuesTest.java |   4 +-
 .../kstream/internals/KStreamWindowedTest.java  |   8 +-
 .../streams/processor/TopologyBuilderTest.java  |  10 +-
 .../internals/ProcessorTopologyTest.java        |  14 +-
 .../org/apache/kafka/test/MockProcessorDef.java |  58 ----
 .../kafka/test/MockProcessorSupplier.java       |  59 +++++
 .../apache/kafka/test/UnlimitedWindowDef.java   |   6 +-
 36 files changed, 533 insertions(+), 522 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
index feb4ee7..87368c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -55,7 +55,7 @@ public class KStreamJob {
                 }
             }).filter(new Predicate<String, Integer>() {
                 @Override
-                public boolean apply(String key, Integer value) {
+                public boolean test(String key, Integer value) {
                     return true;
                 }
             });
@@ -63,13 +63,13 @@ public class KStreamJob {
         KStream<String, Integer>[] streams = stream2.branch(
             new Predicate<String, Integer>() {
                 @Override
-                public boolean apply(String key, Integer value) {
+                public boolean test(String key, Integer value) {
                     return (value % 2) == 0;
                 }
             },
             new Predicate<String, Integer>() {
                 @Override
-                public boolean apply(String key, Integer value) {
+                public boolean test(String key, Integer value) {
                     return true;
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 0b3aba8..92e6284 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.KafkaStreaming;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.StreamingConfig;
@@ -36,10 +36,10 @@ import java.util.Properties;
 
 public class ProcessorJob {
 
-    private static class MyProcessorDef implements ProcessorDef {
+    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
 
         @Override
-        public Processor<String, String> instance() {
+        public Processor<String, String> get() {
             return new Processor<String, String>() {
                 private ProcessorContext context;
                 private KeyValueStore<String, Integer> kvStore;
@@ -102,7 +102,7 @@ public class ProcessorJob {
 
         builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
 
-        builder.addProcessor("PROCESS", new MyProcessorDef(), "SOURCE");
+        builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
 
         builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 6c488cf..ecec882 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -19,10 +19,13 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 /**
  * KStream is an abstraction of a stream of key-value pairs.
+ * 
+ * @param <K> the type of keys
+ * @param <V> the type of values
  */
 public interface KStream<K, V> {
 
@@ -30,7 +33,7 @@ public interface KStream<K, V> {
      * Creates a new stream consists of all elements of this stream which satisfy a predicate
      *
      * @param predicate the instance of Predicate
-     * @return KStream
+     * @return the stream with only those elements that satisfy the predicate
      */
     KStream<K, V> filter(Predicate<K, V> predicate);
 
@@ -38,45 +41,45 @@ public interface KStream<K, V> {
      * Creates a new stream consists all elements of this stream which do not satisfy a predicate
      *
      * @param predicate the instance of Predicate
-     * @return KStream
+     * @return the stream with only those elements that do not satisfy the predicate
      */
     KStream<K, V> filterOut(Predicate<K, V> predicate);
 
     /**
-     * Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream
+     * Creates a new stream by applying transforming each element in this stream into a different element in the new stream.
      *
      * @param mapper the instance of KeyValueMapper
      * @param <K1>   the key type of the new stream
      * @param <V1>   the value type of the new stream
-     * @return KStream
+     * @return the mapped stream
      */
     <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
 
     /**
-     * Creates a new stream by transforming values by a mapper to all values of this stream
+     * Creates a new stream by applying transforming each value in this stream into a different value in the new stream.
      *
      * @param mapper the instance of ValueMapper
      * @param <V1>   the value type of the new stream
-     * @return KStream
+     * @return the mapped stream
      */
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
     /**
-     * Creates a new stream by applying a mapper to all elements of this stream and using the values in the resulting Iterable
+     * Creates a new stream by applying transforming each element in this stream into zero or more elements in the new stream.
      *
      * @param mapper the instance of KeyValueMapper
      * @param <K1>   the key type of the new stream
      * @param <V1>   the value type of the new stream
-     * @return KStream
+     * @return the mapped stream
      */
     <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
 
     /**
-     * Creates a new stream by applying a mapper to all values of this stream and using the values in the resulting Iterable
+     * Creates a new stream by applying transforming each value in this stream into zero or more values in the new stream.
      *
      * @param processor the instance of Processor
      * @param <V1>      the value type of the new stream
-     * @return KStream
+     * @return the mapped stream
      */
     <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
 
@@ -84,18 +87,18 @@ public interface KStream<K, V> {
      * Creates a new windowed stream using a specified window instance.
      *
      * @param windowDef the instance of Window
-     * @return KStream
+     * @return the windowed stream
      */
-    KStreamWindowed<K, V> with(WindowDef<K, V> windowDef);
+    KStreamWindowed<K, V> with(WindowSupplier<K, V> windowDef);
 
     /**
-     * Creates an array of streams from this stream. Each stream in the array coresponds to a predicate in
+     * Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in
      * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to
      * a corresponding stream for the first predicate is evaluated true.
      * An element will be dropped if none of the predicates evaluate true.
      *
-     * @param predicates Instances of Predicate
-     * @return KStream
+     * @param predicates the ordered list of Predicate instances
+     * @return the new streams that each contain those elements for which their Predicate evaluated to true.
      */
     KStream<K, V>[] branch(Predicate<K, V>... predicates);
 
@@ -106,7 +109,7 @@ public interface KStream<K, V> {
      * @param topic           the topic name
      * @param <K1>            the key type of the new stream
      * @param <V1>            the value type of the new stream
-     * @return KStream
+     * @return the new stream that consumes the given topic
      */
     <K1, V1> KStream<K1, V1> through(String topic);
 
@@ -116,16 +119,16 @@ public interface KStream<K, V> {
      *
      * @param topic           the topic name
      * @param keySerializer   key serializer used to send key-value pairs,
-     *                        if not specified the default serializer defined in the configs will be used
+     *                        if not specified the default key serializer defined in the configuration will be used
      * @param valSerializer   value serializer used to send key-value pairs,
-     *                        if not specified the default serializer defined in the configs will be used
+     *                        if not specified the default value serializer defined in the configuration will be used
      * @param keyDeserializer key deserializer used to create the new KStream,
-     *                        if not specified the default deserializer defined in the configs will be used
+     *                        if not specified the default key deserializer defined in the configuration will be used
      * @param valDeserializer value deserializer used to create the new KStream,
-     *                        if not specified the default deserializer defined in the configs will be used
+     *                        if not specified the default value deserializer defined in the configuration will be used
      * @param <K1>            the key type of the new stream
      * @param <V1>            the value type of the new stream
-     * @return KStream
+     * @return the new stream that consumes the given topic
      */
     <K1, V1> KStream<K1, V1> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K1> keyDeserializer, Deserializer<V1> valDeserializer);
 
@@ -150,7 +153,8 @@ public interface KStream<K, V> {
     /**
      * Processes all elements in this stream by applying a processor.
      *
-     * @param processorDef the class of ProcessorDef
+     * @param processorSupplier the supplier of the Processor to use
+     * @return the new stream containing the processed output
      */
-    <K1, V1> KStream<K1, V1> process(ProcessorDef<K, V> processorDef);
+    <K1, V1> KStream<K1, V1> process(ProcessorSupplier<K, V> processorSupplier);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
index 9cdb3bc..c73622e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
@@ -17,8 +17,14 @@
 
 package org.apache.kafka.streams.kstream;
 
+/**
+ * Represents a predicate (boolean-valued function) of two arguments.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of value
+ */
 public interface Predicate<K, V> {
 
-    boolean apply(K key, V value);
+    boolean test(K key, V value);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
deleted file mode 100644
index 5927db6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.internals.FilteredIterator;
-import org.apache.kafka.streams.kstream.internals.WindowSupport;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Stamped;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-
-public class SlidingWindowDef<K, V> implements WindowDef<K, V> {
-    private final String name;
-    private final long duration;
-    private final int maxCount;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
-
-    public SlidingWindowDef(
-            String name,
-            long duration,
-            int maxCount,
-            Serializer<K> keySerializer,
-            Serializer<V> valueSerializer,
-            Deserializer<K> keyDeseriaizer,
-            Deserializer<V> valueDeserializer) {
-        this.name = name;
-        this.duration = duration;
-        this.maxCount = maxCount;
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
-        this.keyDeserializer = keyDeseriaizer;
-        this.valueDeserializer = valueDeserializer;
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public Window<K, V> instance() {
-        return new SlidingWindow();
-    }
-
-    public class SlidingWindow extends WindowSupport implements Window<K, V> {
-        private final Object lock = new Object();
-        private ProcessorContext context;
-        private int slotNum; // used as a key for Kafka log compaction
-        private LinkedList<K> list = new LinkedList<K>();
-        private HashMap<K, ValueList<V>> map = new HashMap<>();
-
-        @Override
-        public void init(ProcessorContext context) {
-            this.context = context;
-            SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
-            context.register(this, restoreFunc);
-
-            for (ValueList<V> valueList : map.values()) {
-                valueList.clearDirtyValues();
-            }
-            this.slotNum = restoreFunc.slotNum;
-        }
-
-        @Override
-        public Iterator<V> findAfter(K key, final long timestamp) {
-            return find(key, timestamp, timestamp + duration);
-        }
-
-        @Override
-        public Iterator<V> findBefore(K key, final long timestamp) {
-            return find(key, timestamp - duration, timestamp);
-        }
-
-        @Override
-        public Iterator<V> find(K key, final long timestamp) {
-            return find(key, timestamp - duration, timestamp + duration);
-        }
-
-        /*
-         * finds items in the window between startTime and endTime (both inclusive)
-         */
-        private Iterator<V> find(K key, final long startTime, final long endTime) {
-            final ValueList<V> values = map.get(key);
-
-            if (values == null) {
-                return Collections.emptyIterator();
-            } else {
-                return new FilteredIterator<V, Value<V>>(values.iterator()) {
-                    @Override
-                    protected V filter(Value<V> item) {
-                        if (startTime <= item.timestamp && item.timestamp <= endTime)
-                            return item.value;
-                        else
-                            return null;
-                    }
-                };
-            }
-        }
-
-        @Override
-        public void put(K key, V value, long timestamp) {
-            synchronized (lock) {
-                slotNum++;
-
-                list.offerLast(key);
-
-                ValueList<V> values = map.get(key);
-                if (values == null) {
-                    values = new ValueList<>();
-                    map.put(key, values);
-                }
-
-                values.add(slotNum, value, timestamp);
-            }
-            evictExcess();
-            evictExpired(timestamp - duration);
-        }
-
-        private void evictExcess() {
-            while (list.size() > maxCount) {
-                K oldestKey = list.pollFirst();
-
-                ValueList<V> values = map.get(oldestKey);
-                values.removeFirst();
-
-                if (values.isEmpty()) map.remove(oldestKey);
-            }
-        }
-
-        private void evictExpired(long cutoffTime) {
-            while (true) {
-                K oldestKey = list.peekFirst();
-
-                ValueList<V> values = map.get(oldestKey);
-                Stamped<V> oldestValue = values.first();
-
-                if (oldestValue.timestamp < cutoffTime) {
-                    list.pollFirst();
-                    values.removeFirst();
-
-                    if (values.isEmpty()) map.remove(oldestKey);
-                } else {
-                    break;
-                }
-            }
-        }
-
-        @Override
-        public String name() {
-            return name;
-        }
-
-        @Override
-        public void flush() {
-            IntegerSerializer intSerializer = new IntegerSerializer();
-            ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
-
-            RecordCollector collector = ((ProcessorContextImpl) context).recordCollector();
-
-            for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) {
-                ValueList<V> values = entry.getValue();
-                if (values.hasDirtyValues()) {
-                    K key = entry.getKey();
-
-                    byte[] keyBytes = keySerializer.serialize(name, key);
-
-                    Iterator<Value<V>> iterator = values.dirtyValueIterator();
-                    while (iterator.hasNext()) {
-                        Value<V> dirtyValue = iterator.next();
-                        byte[] slot = intSerializer.serialize("", dirtyValue.slotNum);
-                        byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value);
-
-                        byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length];
-
-                        int offset = 0;
-                        offset += putLong(combined, offset, dirtyValue.timestamp);
-                        offset += puts(combined, offset, keyBytes);
-                        offset += puts(combined, offset, valBytes);
-
-                        if (offset != combined.length)
-                            throw new IllegalStateException("serialized length does not match");
-
-                        collector.send(new ProducerRecord<>(name, context.id(), slot, combined), byteArraySerializer, byteArraySerializer);
-                    }
-                    values.clearDirtyValues();
-                }
-            }
-        }
-
-        @Override
-        public void close() {
-            // TODO
-        }
-
-        @Override
-        public boolean persistent() {
-            // TODO: should not be persistent, right?
-            return false;
-        }
-
-        private class SlidingWindowRegistryCallback implements StateRestoreCallback {
-
-            final IntegerDeserializer intDeserializer;
-            int slotNum = 0;
-
-            SlidingWindowRegistryCallback() {
-                intDeserializer = new IntegerDeserializer();
-            }
-
-            @Override
-            public void restore(byte[] slot, byte[] bytes) {
-
-                slotNum = intDeserializer.deserialize("", slot);
-
-                int offset = 0;
-                // timestamp
-                long timestamp = getLong(bytes, offset);
-                offset += 8;
-                // key
-                int length = getInt(bytes, offset);
-                offset += 4;
-                K key = deserialize(bytes, offset, length, name, keyDeserializer);
-                offset += length;
-                // value
-                length = getInt(bytes, offset);
-                offset += 4;
-                V value = deserialize(bytes, offset, length, name, valueDeserializer);
-
-                put(key, value, timestamp);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
new file mode 100644
index 0000000..0110c87
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.FilteredIterator;
+import org.apache.kafka.streams.kstream.internals.WindowSupport;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.Stamped;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
+    private final String name;
+    private final long duration;
+    private final int maxCount;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+
+    public SlidingWindowSupplier(
+            String name,
+            long duration,
+            int maxCount,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
+            Deserializer<K> keyDeseriaizer,
+            Deserializer<V> valueDeserializer) {
+        this.name = name;
+        this.duration = duration;
+        this.maxCount = maxCount;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.keyDeserializer = keyDeseriaizer;
+        this.valueDeserializer = valueDeserializer;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Window<K, V> get() {
+        return new SlidingWindow();
+    }
+
+    public class SlidingWindow extends WindowSupport implements Window<K, V> {
+        private final Object lock = new Object();
+        private ProcessorContext context;
+        private int slotNum; // used as a key for Kafka log compaction
+        private LinkedList<K> list = new LinkedList<K>();
+        private HashMap<K, ValueList<V>> map = new HashMap<>();
+
+        @Override
+        public void init(ProcessorContext context) {
+            this.context = context;
+            SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
+            context.register(this, restoreFunc);
+
+            for (ValueList<V> valueList : map.values()) {
+                valueList.clearDirtyValues();
+            }
+            this.slotNum = restoreFunc.slotNum;
+        }
+
+        @Override
+        public Iterator<V> findAfter(K key, final long timestamp) {
+            return find(key, timestamp, timestamp + duration);
+        }
+
+        @Override
+        public Iterator<V> findBefore(K key, final long timestamp) {
+            return find(key, timestamp - duration, timestamp);
+        }
+
+        @Override
+        public Iterator<V> find(K key, final long timestamp) {
+            return find(key, timestamp - duration, timestamp + duration);
+        }
+
+        /*
+         * finds items in the window between startTime and endTime (both inclusive)
+         */
+        private Iterator<V> find(K key, final long startTime, final long endTime) {
+            final ValueList<V> values = map.get(key);
+
+            if (values == null) {
+                return Collections.emptyIterator();
+            } else {
+                return new FilteredIterator<V, Value<V>>(values.iterator()) {
+                    @Override
+                    protected V filter(Value<V> item) {
+                        if (startTime <= item.timestamp && item.timestamp <= endTime)
+                            return item.value;
+                        else
+                            return null;
+                    }
+                };
+            }
+        }
+
+        @Override
+        public void put(K key, V value, long timestamp) {
+            synchronized (lock) {
+                slotNum++;
+
+                list.offerLast(key);
+
+                ValueList<V> values = map.get(key);
+                if (values == null) {
+                    values = new ValueList<>();
+                    map.put(key, values);
+                }
+
+                values.add(slotNum, value, timestamp);
+            }
+            evictExcess();
+            evictExpired(timestamp - duration);
+        }
+
+        private void evictExcess() {
+            while (list.size() > maxCount) {
+                K oldestKey = list.pollFirst();
+
+                ValueList<V> values = map.get(oldestKey);
+                values.removeFirst();
+
+                if (values.isEmpty()) map.remove(oldestKey);
+            }
+        }
+
+        private void evictExpired(long cutoffTime) {
+            while (true) {
+                K oldestKey = list.peekFirst();
+
+                ValueList<V> values = map.get(oldestKey);
+                Stamped<V> oldestValue = values.first();
+
+                if (oldestValue.timestamp < cutoffTime) {
+                    list.pollFirst();
+                    values.removeFirst();
+
+                    if (values.isEmpty()) map.remove(oldestKey);
+                } else {
+                    break;
+                }
+            }
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
+
+        @Override
+        public void flush() {
+            IntegerSerializer intSerializer = new IntegerSerializer();
+            ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
+
+            RecordCollector collector = ((ProcessorContextImpl) context).recordCollector();
+
+            for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) {
+                ValueList<V> values = entry.getValue();
+                if (values.hasDirtyValues()) {
+                    K key = entry.getKey();
+
+                    byte[] keyBytes = keySerializer.serialize(name, key);
+
+                    Iterator<Value<V>> iterator = values.dirtyValueIterator();
+                    while (iterator.hasNext()) {
+                        Value<V> dirtyValue = iterator.next();
+                        byte[] slot = intSerializer.serialize("", dirtyValue.slotNum);
+                        byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value);
+
+                        byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length];
+
+                        int offset = 0;
+                        offset += putLong(combined, offset, dirtyValue.timestamp);
+                        offset += puts(combined, offset, keyBytes);
+                        offset += puts(combined, offset, valBytes);
+
+                        if (offset != combined.length)
+                            throw new IllegalStateException("serialized length does not match");
+
+                        collector.send(new ProducerRecord<>(name, context.id(), slot, combined), byteArraySerializer, byteArraySerializer);
+                    }
+                    values.clearDirtyValues();
+                }
+            }
+        }
+
+        @Override
+        public void close() {
+            // TODO
+        }
+
+        @Override
+        public boolean persistent() {
+            // TODO: should not be persistent, right?
+            return false;
+        }
+
+        private class SlidingWindowRegistryCallback implements StateRestoreCallback {
+
+            final IntegerDeserializer intDeserializer;
+            int slotNum = 0;
+
+            SlidingWindowRegistryCallback() {
+                intDeserializer = new IntegerDeserializer();
+            }
+
+            @Override
+            public void restore(byte[] slot, byte[] bytes) {
+
+                slotNum = intDeserializer.deserialize("", slot);
+
+                int offset = 0;
+                // timestamp
+                long timestamp = getLong(bytes, offset);
+                offset += 8;
+                // key
+                int length = getInt(bytes, offset);
+                offset += 4;
+                K key = deserialize(bytes, offset, length, name, keyDeserializer);
+                offset += length;
+                // value
+                length = getInt(bytes, offset);
+                offset += 4;
+                V value = deserialize(bytes, offset, length, name, valueDeserializer);
+
+                put(key, value, timestamp);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java
deleted file mode 100644
index bbc5979..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface WindowDef<K, V> {
-
-    String name();
-
-    Window<K, V> instance();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
new file mode 100644
index 0000000..46a2b9e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface WindowSupplier<K, V> {
+
+    String name();
+
+    Window<K, V> get();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
index c806147..06083b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
@@ -19,20 +19,20 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.kstream.Predicate;
 
-class KStreamBranch<K, V> implements ProcessorDef<K, V> {
+class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
 
     private final Predicate<K, V>[] predicates;
 
     @SuppressWarnings("unchecked")
-    public KStreamBranch(Predicate... predicates) {
+    public KStreamBranch(Predicate<K, V> ... predicates) {
         this.predicates = predicates;
     }
 
     @Override
-    public Processor<K, V> instance() {
+    public Processor<K, V> get() {
         return new KStreamBranchProcessor();
     }
 
@@ -40,7 +40,7 @@ class KStreamBranch<K, V> implements ProcessorDef<K, V> {
         @Override
         public void process(K key, V value) {
             for (int i = 0; i < predicates.length; i++) {
-                if (predicates[i].apply(key, value)) {
+                if (predicates[i].test(key, value)) {
                     // use forward with childIndex here and then break the loop
                     // so that no record is going to be piped to multiple streams
                     context().forward(key, value, i);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
index 22800f3..0b1f1e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
@@ -20,9 +20,9 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamFilter<K, V> implements ProcessorDef<K, V> {
+class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {
 
     private final Predicate<K, V> predicate;
     private final boolean filterOut;
@@ -33,14 +33,14 @@ class KStreamFilter<K, V> implements ProcessorDef<K, V> {
     }
 
     @Override
-    public Processor<K, V> instance() {
+    public Processor<K, V> get() {
         return new KStreamFilterProcessor();
     }
 
     private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
         @Override
         public void process(K key, V value) {
-            if (filterOut ^ predicate.apply(key, value)) {
+            if (filterOut ^ predicate.test(key, value)) {
                 context().forward(key, value);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index 6c7f4ea..175a002 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -21,9 +21,9 @@ import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef<K1, V1> {
+class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
 
     private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper;
 
@@ -32,7 +32,7 @@ class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef<K1, V1> {
     }
 
     @Override
-    public Processor<K1, V1> instance() {
+    public Processor<K1, V1> get() {
         return new KStreamFlatMapProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index 9cdcdf5..9b4559b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -20,9 +20,9 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef<K1, V1> {
+class KStreamFlatMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
 
     private final ValueMapper<V1, ? extends Iterable<V2>> mapper;
 
@@ -31,7 +31,7 @@ class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef<K1, V1> {
     }
 
     @Override
-    public Processor<K1, V1> instance() {
+    public Processor<K1, V1> get() {
         return new KStreamFlatMapValuesProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 52c717f..cff97d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -19,15 +19,15 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorDef;
-import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamWindowed;
+import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.WindowDef;
+import org.apache.kafka.streams.kstream.WindowSupplier;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
 
 import java.lang.reflect.Array;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -127,12 +127,12 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
     }
 
     @Override
-    public KStreamWindowed<K, V> with(WindowDef<K, V> window) {
+    public KStreamWindowed<K, V> with(WindowSupplier<K, V> windowSupplier) {
         String name = WINDOWED_NAME + INDEX.getAndIncrement();
 
-        topology.addProcessor(name, new KStreamWindow<>(window), this.name);
+        topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name);
 
-        return new KStreamWindowedImpl<>(topology, name, window);
+        return new KStreamWindowedImpl<>(topology, name, windowSupplier);
     }
 
     @Override
@@ -191,10 +191,10 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> process(final ProcessorDef<K, V> processorDef) {
+    public <K1, V1> KStream<K1, V1> process(final ProcessorSupplier<K, V> processorSupplier) {
         String name = PROCESSOR_NAME + INDEX.getAndIncrement();
 
-        topology.addProcessor(name, processorDef, this.name);
+        topology.addProcessor(name, processorSupplier, this.name);
 
         return new KStreamImpl<>(topology, name);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
index 311efef..997953f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
@@ -22,11 +22,11 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 import java.util.Iterator;
 
-class KStreamJoin<K, V, V1, V2> implements ProcessorDef<K, V1> {
+class KStreamJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
 
     private static abstract class Finder<K, T> {
         abstract Iterator<T> find(K key, long timestamp);
@@ -41,7 +41,7 @@ class KStreamJoin<K, V, V1, V2> implements ProcessorDef<K, V1> {
     }
 
     @Override
-    public Processor<K, V1> instance() {
+    public Processor<K, V1> get() {
         return new KStreamJoinProcessor(windowName);
     }
 
@@ -66,6 +66,7 @@ class KStreamJoin<K, V, V1, V2> implements ProcessorDef<K, V1> {
             final Window<K, V2> window = (Window<K, V2>) context.getStateStore(windowName);
 
             this.finder = new Finder<K, V2>() {
+                @Override
                 Iterator<V2> find(K key, long timestamp) {
                     return window.find(key, timestamp);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index a9a7b24..3868318 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -21,9 +21,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamMap<K1, V1, K2, V2> implements ProcessorDef<K1, V1> {
+class KStreamMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
 
     private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
 
@@ -32,7 +32,7 @@ class KStreamMap<K1, V1, K2, V2> implements ProcessorDef<K1, V1> {
     }
 
     @Override
-    public Processor<K1, V1> instance() {
+    public Processor<K1, V1> get() {
         return new KStreamMapProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index ac39f37..692b421 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -17,12 +17,12 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamMapValues<K1, V1, V2> implements ProcessorDef<K1, V1> {
+class KStreamMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
 
     private final ValueMapper<V1, V2> mapper;
 
@@ -31,7 +31,7 @@ class KStreamMapValues<K1, V1, V2> implements ProcessorDef<K1, V1> {
     }
 
     @Override
-    public Processor<K1, V1> instance() {
+    public Processor<K1, V1> get() {
         return new KStreamMapProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
index 0f4638d..59a815b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
@@ -19,13 +19,13 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamPassThrough<K, V> implements ProcessorDef<K, V> {
+class KStreamPassThrough<K, V> implements ProcessorSupplier<K, V> {
 
     @Override
-    public Processor<K, V> instance() {
-        return new KStreamPassThroughProcessor();
+    public Processor<K, V> get() {
+        return new KStreamPassThroughProcessor<K, V>();
     }
 
     public class KStreamPassThroughProcessor<K, V> extends AbstractProcessor<K, V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
index bdd1323..2923936 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
@@ -18,26 +18,26 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.WindowSupplier;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorDef;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.kstream.WindowDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-public class KStreamWindow<K, V> implements ProcessorDef<K, V> {
+public class KStreamWindow<K, V> implements ProcessorSupplier<K, V> {
 
-    private final WindowDef<K, V> windowDef;
+    private final WindowSupplier<K, V> windowSupplier;
 
-    KStreamWindow(WindowDef<K, V> windowDef) {
-        this.windowDef = windowDef;
+    KStreamWindow(WindowSupplier<K, V> windowSupplier) {
+        this.windowSupplier = windowSupplier;
     }
 
-    public WindowDef<K, V> window() {
-        return windowDef;
+    public WindowSupplier<K, V> window() {
+        return windowSupplier;
     }
 
     @Override
-    public Processor<K, V> instance() {
+    public Processor<K, V> get() {
         return new KStreamWindowProcessor();
     }
 
@@ -48,7 +48,7 @@ public class KStreamWindow<K, V> implements ProcessorDef<K, V> {
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
-            this.window = windowDef.instance();
+            this.window = windowSupplier.get();
             this.window.init(context);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
index a208af6..9316012 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
@@ -20,22 +20,22 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamWindowed;
 import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.WindowDef;
+import org.apache.kafka.streams.kstream.WindowSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 
 public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implements KStreamWindowed<K, V> {
 
-    private final WindowDef<K, V> windowDef;
+    private final WindowSupplier<K, V> windowSupplier;
 
-    public KStreamWindowedImpl(TopologyBuilder topology, String name, WindowDef<K, V> windowDef) {
+    public KStreamWindowedImpl(TopologyBuilder topology, String name, WindowSupplier<K, V> windowSupplier) {
         super(topology, name);
-        this.windowDef = windowDef;
+        this.windowSupplier = windowSupplier;
     }
 
     @Override
     public <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> valueJoiner) {
-        String thisWindowName = this.windowDef.name();
-        String otherWindowName = ((KStreamWindowedImpl<K, V1>) other).windowDef.name();
+        String thisWindowName = this.windowSupplier.name();
+        String otherWindowName = ((KStreamWindowedImpl<K, V1>) other).windowSupplier.name();
 
         KStreamJoin<K, V2, V, V1> joinThis = new KStreamJoin<>(otherWindowName, valueJoiner);
         KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner));

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
deleted file mode 100644
index a32a899..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-public interface ProcessorDef<K, V> {
-
-    Processor<K, V> instance();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
new file mode 100644
index 0000000..719d3ac
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+public interface ProcessorSupplier<K, V> {
+
+    Processor<K, V> get();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index a254c13..833e29b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -57,18 +57,17 @@ public class TopologyBuilder {
     private class ProcessorNodeFactory implements NodeFactory {
         public final String[] parents;
         private final String name;
-        private final ProcessorDef definition;
+        private final ProcessorSupplier supplier;
 
-        public ProcessorNodeFactory(String name, String[] parents, ProcessorDef definition) {
+        public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
             this.name = name;
             this.parents = parents.clone();
-            this.definition = definition;
+            this.supplier = supplier;
         }
 
         @Override
         public ProcessorNode build() {
-            Processor processor = definition.instance();
-            return new ProcessorNode(name, processor);
+            return new ProcessorNode(name, supplier.get());
         }
     }
 
@@ -123,7 +122,7 @@ public class TopologyBuilder {
      * {@link StreamingConfig streaming configuration}.
      *
      * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorDef, String...) adding processor children}.
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
@@ -136,7 +135,7 @@ public class TopologyBuilder {
      * The sink will use the specified key and value deserializers.
      *
      * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorDef, String...) adding processor children}.
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
      * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
      * should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
      * {@link StreamingConfig streaming configuration}
@@ -216,12 +215,12 @@ public class TopologyBuilder {
      * Add a new processor node that receives and processes messages output by one or more parent source or processor node.
      * Any new messages output by this processor will be forwarded to its child processor or sink nodes.
      * @param name the unique name of the processor node
-     * @param definition the supplier used to obtain this node's {@link Processor} instance
+     * @param supplier the supplier used to obtain this node's {@link Processor} instance
      * @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
      * and process
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder addProcessor(String name, ProcessorDef definition, String... parentNames) {
+    public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
         if (nodeNames.contains(name))
             throw new TopologyException("Processor " + name + " is already added.");
 
@@ -237,7 +236,7 @@ public class TopologyBuilder {
         }
 
         nodeNames.add(name);
-        nodeFactories.add(new ProcessorNodeFactory(name, parentNames, definition));
+        nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier));
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index c18ddfe..40eba2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
 
 import java.lang.reflect.Array;
@@ -44,19 +44,19 @@ public class KStreamBranchTest {
 
         Predicate<Integer, String> isEven = new Predicate<Integer, String>() {
             @Override
-            public boolean apply(Integer key, String value) {
+            public boolean test(Integer key, String value) {
                 return (key % 2) == 0;
             }
         };
         Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
             @Override
-            public boolean apply(Integer key, String value) {
+            public boolean test(Integer key, String value) {
                 return (key % 3) == 0;
             }
         };
         Predicate<Integer, String> isOdd = new Predicate<Integer, String>() {
             @Override
-            public boolean apply(Integer key, String value) {
+            public boolean test(Integer key, String value) {
                 return (key % 2) != 0;
             }
         };
@@ -65,16 +65,16 @@ public class KStreamBranchTest {
 
         KStream<Integer, String> stream;
         KStream<Integer, String>[] branches;
-        MockProcessorDef<Integer, String>[] processors;
+        MockProcessorSupplier<Integer, String>[] processors;
 
         stream = builder.from(keyDeserializer, valDeserializer, topicName);
         branches = stream.branch(isEven, isMultipleOfThree, isOdd);
 
         assertEquals(3, branches.length);
 
-        processors = (MockProcessorDef<Integer, String>[]) Array.newInstance(MockProcessorDef.class, branches.length);
+        processors = (MockProcessorSupplier<Integer, String>[]) Array.newInstance(MockProcessorSupplier.class, branches.length);
         for (int i = 0; i < branches.length; i++) {
-            processors[i] = new MockProcessorDef<>();
+            processors[i] = new MockProcessorSupplier<>();
             branches[i].process(processors[i]);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index b80e1e2..d1e5d38 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.MockProcessorSupplier;
 
 import org.junit.Test;
 
@@ -38,7 +38,7 @@ public class KStreamFilterTest {
 
     private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
         @Override
-        public boolean apply(Integer key, String value) {
+        public boolean test(Integer key, String value) {
             return (key % 3) == 0;
         }
     };
@@ -49,9 +49,9 @@ public class KStreamFilterTest {
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
 
         KStream<Integer, String> stream;
-        MockProcessorDef<Integer, String> processor;
+        MockProcessorSupplier<Integer, String> processor;
 
-        processor = new MockProcessorDef<>();
+        processor = new MockProcessorSupplier<>();
         stream = builder.from(keyDeserializer, valDeserializer, topicName);
         stream.filter(isMultipleOfThree).process(processor);
 
@@ -69,9 +69,9 @@ public class KStreamFilterTest {
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
 
         KStream<Integer, String> stream;
-        MockProcessorDef<Integer, String> processor;
+        MockProcessorSupplier<Integer, String> processor;
 
-        processor = new MockProcessorDef<>();
+        processor = new MockProcessorSupplier<>();
         stream = builder.from(keyDeserializer, valDeserializer, topicName);
         stream.filterOut(isMultipleOfThree).process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index e87223e..61b5ccd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -57,9 +57,9 @@ public class KStreamFlatMapTest {
         final int[] expectedKeys = {0, 1, 2, 3};
 
         KStream<Integer, String> stream;
-        MockProcessorDef<String, String> processor;
+        MockProcessorSupplier<String, String> processor;
 
-        processor = new MockProcessorDef<>();
+        processor = new MockProcessorSupplier<>();
         stream = builder.from(keyDeserializer, valDeserializer, topicName);
         stream.flatMap(mapper).process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 09dda65..66faf07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -55,9 +55,9 @@ public class KStreamFlatMapValuesTest {
         final int[] expectedKeys = {0, 1, 2, 3};
 
         KStream<Integer, String> stream;
-        MockProcessorDef<Integer, String> processor;
+        MockProcessorSupplier<Integer, String> processor;
 
-        processor = new MockProcessorDef<>();
+        processor = new MockProcessorSupplier<>();
         stream = builder.from(keyDeserializer, valDeserializer, topicName);
         stream.flatMapValues(mapper).process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 0660ddd..875712a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.UnlimitedWindowDef;
 import org.junit.Test;
 
@@ -47,12 +47,12 @@ public class KStreamImplTest {
         KStream<String, String> stream1 =
             source1.filter(new Predicate<String, String>() {
                 @Override
-                public boolean apply(String key, String value) {
+                public boolean test(String key, String value) {
                     return true;
                 }
             }).filterOut(new Predicate<String, String>() {
                 @Override
-                public boolean apply(String key, String value) {
+                public boolean test(String key, String value) {
                     return false;
                 }
             });
@@ -74,13 +74,13 @@ public class KStreamImplTest {
         KStream<String, Integer>[] streams2 = stream2.branch(
             new Predicate<String, Integer>() {
                 @Override
-                public boolean apply(String key, Integer value) {
+                public boolean test(String key, Integer value) {
                     return (value % 2) == 0;
                 }
             },
             new Predicate<String, Integer>() {
                 @Override
-                public boolean apply(String key, Integer value) {
+                public boolean test(String key, Integer value) {
                     return true;
                 }
             }
@@ -89,13 +89,13 @@ public class KStreamImplTest {
         KStream<String, Integer>[] streams3 = stream3.branch(
             new Predicate<String, Integer>() {
                 @Override
-                public boolean apply(String key, Integer value) {
+                public boolean test(String key, Integer value) {
                     return (value % 2) == 0;
                 }
             },
             new Predicate<String, Integer>() {
                 @Override
-                public boolean apply(String key, Integer value) {
+                public boolean test(String key, Integer value) {
                     return true;
                 }
             }
@@ -119,7 +119,7 @@ public class KStreamImplTest {
 
         stream4.to("topic-5");
 
-        stream5.through("topic-6").process(new MockProcessorDef<>()).to("topic-7");
+        stream5.through("topic-6").process(new MockProcessorSupplier<>()).to("topic-7");
 
         assertEquals(2 + // sources
             2 + // stream1

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
index 7dea8e0..58899fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.UnlimitedWindowDef;
 import org.junit.Test;
 
@@ -90,10 +90,10 @@ public class KStreamJoinTest {
         KStream<Integer, String> stream2;
         KStreamWindowed<Integer, String> windowed1;
         KStreamWindowed<Integer, String> windowed2;
-        MockProcessorDef<Integer, String> processor;
+        MockProcessorSupplier<Integer, String> processor;
         String[] expected;
 
-        processor = new MockProcessorDef<>();
+        processor = new MockProcessorSupplier<>();
         stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
         stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
         windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index bec524f..2ae8a97 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -51,9 +51,9 @@ public class KStreamMapTest {
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
         KStream<Integer, String> stream;
-        MockProcessorDef<String, Integer> processor;
+        MockProcessorSupplier<String, Integer> processor;
 
-        processor = new MockProcessorDef<>();
+        processor = new MockProcessorSupplier<>();
         stream = builder.from(keyDeserializer, valDeserializer, topicName);
         stream.map(mapper).process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index b6507fe..f830c00 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -50,7 +50,7 @@ public class KStreamMapValuesTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         KStream<Integer, String> stream;
-        MockProcessorDef<Integer, Integer> processor = new MockProcessorDef<>();
+        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
         stream = builder.from(keyDeserializer, valDeserializer, topicName);
         stream.mapValues(mapper).process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
index 48a9fc3..c3dc7e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.WindowDef;
+import org.apache.kafka.streams.kstream.WindowSupplier;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.UnlimitedWindowDef;
 import org.junit.Test;
@@ -46,11 +46,11 @@ public class KStreamWindowedTest {
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
         KStream<Integer, String> stream;
-        WindowDef<Integer, String> windowDef;
+        WindowSupplier<Integer, String> windowSupplier;
 
-        windowDef = new UnlimitedWindowDef<>(windowName);
+        windowSupplier = new UnlimitedWindowDef<>(windowName);
         stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.with(windowDef);
+        stream.with(windowSupplier);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);
         Window<Integer, String> window = (Window<Integer, String>) driver.getStateStore(windowName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 57a78ff..00522d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
 
 public class TopologyBuilderTest {
@@ -45,22 +45,22 @@ public class TopologyBuilderTest {
         final TopologyBuilder builder = new TopologyBuilder();
 
         builder.addSource("source", "topic-1");
-        builder.addProcessor("processor", new MockProcessorDef(), "source");
-        builder.addProcessor("processor", new MockProcessorDef(), "source");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
     }
 
     @Test(expected = TopologyException.class)
     public void testAddProcessorWithWrongParent() {
         final TopologyBuilder builder = new TopologyBuilder();
 
-        builder.addProcessor("processor", new MockProcessorDef(), "source");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
     }
 
     @Test(expected = TopologyException.class)
     public void testAddProcessorWithSelfParent() {
         final TopologyBuilder builder = new TopologyBuilder();
 
-        builder.addProcessor("processor", new MockProcessorDef(), "processor");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
     }
 
     @Test(expected = TopologyException.class)


Mime
View raw message