kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [2/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to KafkaStreams
Date Fri, 22 Jan 2016 21:00:10 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 578357a..e5d0922 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -39,8 +39,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -67,14 +67,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class StreamThread extends Thread {
 
     private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
-    private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
+    private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
 
     public final PartitionGrouper partitionGrouper;
     public final String jobId;
     public final String clientId;
     public final UUID processId;
 
-    protected final StreamingConfig config;
+    protected final StreamsConfig config;
     protected final TopologyBuilder builder;
     protected final Set<String> sourceTopics;
     protected final Producer<byte[], byte[]> producer;
@@ -93,9 +93,9 @@ public class StreamThread extends Thread {
     private final long cleanTimeMs;
     private final long commitTimeMs;
     private final long totalRecordsToProcess;
-    private final StreamingMetricsImpl sensors;
+    private final StreamsMetricsImpl sensors;
 
-    private KafkaStreamingPartitionAssignor partitionAssignor = null;
+    private StreamPartitionAssignor partitionAssignor = null;
 
     private long lastClean;
     private long lastCommit;
@@ -122,17 +122,17 @@ public class StreamThread extends Thread {
     };
 
     public StreamThread(TopologyBuilder builder,
-                        StreamingConfig config,
+                        StreamsConfig config,
                         String jobId,
                         String clientId,
                         UUID processId,
                         Metrics metrics,
-                        Time time) throws Exception {
+                        Time time) {
         this(builder, config, null , null, null, jobId, clientId, processId, metrics, time);
     }
 
     StreamThread(TopologyBuilder builder,
-                 StreamingConfig config,
+                 StreamsConfig config,
                  Producer<byte[], byte[]> producer,
                  Consumer<byte[], byte[]> consumer,
                  Consumer<byte[], byte[]> restoreConsumer,
@@ -140,8 +140,8 @@ public class StreamThread extends Thread {
                  String clientId,
                  UUID processId,
                  Metrics metrics,
-                 Time time) throws Exception {
-        super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
+                 Time time) {
+        super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
 
         this.jobId = jobId;
         this.config = config;
@@ -149,7 +149,7 @@ public class StreamThread extends Thread {
         this.sourceTopics = builder.sourceTopics();
         this.clientId = clientId;
         this.processId = processId;
-        this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
+        this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
 
         // set the producer and consumer clients
         this.producer = (producer != null) ? producer : createProducer();
@@ -167,24 +167,24 @@ public class StreamThread extends Thread {
         this.standbyRecords = new HashMap<>();
 
         // read in task specific config values
-        this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
+        this.stateDir = new File(this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
         this.stateDir.mkdir();
-        this.pollTimeMs = config.getLong(StreamingConfig.POLL_MS_CONFIG);
-        this.commitTimeMs = config.getLong(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG);
-        this.cleanTimeMs = config.getLong(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
-        this.totalRecordsToProcess = config.getLong(StreamingConfig.TOTAL_RECORDS_TO_PROCESS);
+        this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
+        this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
+        this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
+        this.totalRecordsToProcess = config.getLong(StreamsConfig.TOTAL_RECORDS_TO_PROCESS);
 
         this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
         this.lastCommit = time.milliseconds();
         this.recordsProcessed = 0;
         this.time = time;
 
-        this.sensors = new StreamingMetricsImpl(metrics);
+        this.sensors = new StreamsMetricsImpl(metrics);
 
         this.running = new AtomicBoolean(true);
     }
 
-    public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
+    public void partitionAssignor(StreamPartitionAssignor partitionAssignor) {
         this.partitionAssignor = partitionAssignor;
     }
 
@@ -227,7 +227,7 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Shutdown this streaming thread.
+     * Shutdown this stream thread.
      */
     public void close() {
         running.set(false);
@@ -673,7 +673,7 @@ public class StreamThread extends Thread {
         }
     }
 
-    private class StreamingMetricsImpl implements StreamingMetrics {
+    private class StreamsMetricsImpl implements StreamsMetrics {
         final Metrics metrics;
         final String metricGrpName;
         final Map<String, String> metricTags;
@@ -685,10 +685,10 @@ public class StreamThread extends Thread {
         final Sensor taskCreationSensor;
         final Sensor taskDestructionSensor;
 
-        public StreamingMetricsImpl(Metrics metrics) {
+        public StreamsMetricsImpl(Metrics metrics) {
 
             this.metrics = metrics;
-            this.metricGrpName = "streaming-metrics";
+            this.metricGrpName = "stream-metrics";
             this.metricTags = new LinkedHashMap<>();
             this.metricTags.put("client-id", clientId + "-" + getName());
 
@@ -734,7 +734,7 @@ public class StreamThread extends Thread {
             for (int i = 0; i < tags.length; i += 2)
                 tagMap.put(tags[i], tags[i + 1]);
 
-            String metricGroupName = "streaming-" + scopeName + "-metrics";
+            String metricGroupName = "stream-" + scopeName + "-metrics";
 
             // first add the global operation metrics if not yet, with the global tags only
             Sensor parent = metrics.sensor(scopeName + "-" + operationName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java b/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
deleted file mode 100644
index 183b691..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-public class Entry<K, V> {
-
-    private final K key;
-    private final V value;
-
-    public Entry(K key, V value) {
-        this.key = key;
-        this.value = value;
-    }
-
-    public K key() {
-        return key;
-    }
-
-    public V value() {
-        return value;
-    }
-
-    public String toString() {
-        return "Entry(" + key() + ", " + value() + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index 0fbd4ae..bd118a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -19,10 +19,12 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.KeyValue;
+
 import java.io.Closeable;
 import java.util.Iterator;
 
-public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>>, Closeable {
+public interface KeyValueIterator<K, V> extends Iterator<KeyValue<K, V>>, Closeable {
 
     @Override
     public void close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index e4faed1..d448044 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -19,6 +19,7 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StateStore;
 
 import java.util.List;
@@ -55,7 +56,7 @@ public interface KeyValueStore<K, V> extends StateStore {
      * @param entries A list of entries to put into the store.
      * @throws NullPointerException If null is used for any key or value.
      */
-    abstract public void putAll(List<Entry<K, V>> entries);
+    abstract public void putAll(List<KeyValue<K, V>> entries);
 
     /**
      * Delete the value from the store (if there is one)

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index 55d1ac3..08cd049 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -19,7 +19,7 @@
 
 package org.apache.kafka.streams.state;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 
 import java.util.Iterator;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 286db1b..4856b09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -18,10 +18,10 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -97,9 +97,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         }
 
         @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
+        public void putAll(List<KeyValue<K, V>> entries) {
+            for (KeyValue<K, V> entry : entries)
+                put(entry.key, entry.value);
         }
 
         @Override
@@ -140,9 +140,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
             }
 
             @Override
-            public Entry<K, V> next() {
+            public KeyValue<K, V> next() {
                 Map.Entry<K, V> entry = iter.next();
-                return new Entry<>(entry.getKey(), entry.getValue());
+                return new KeyValue<>(entry.getKey(), entry.getValue());
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 6a38423..22ee3f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -131,9 +131,9 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
         }
 
         @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
+        public void putAll(List<KeyValue<K, V>> entries) {
+            for (KeyValue<K, V> entry : entries)
+                put(entry.key, entry.value);
         }
 
         @Override
@@ -179,9 +179,9 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
             }
 
             @Override
-            public Entry<K, V> next() {
+            public KeyValue<K, V> next() {
                 lastKey = keys.next();
-                return new Entry<>(lastKey, entries.get(lastKey));
+                return new KeyValue<>(lastKey, entries.get(lastKey));
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 21f73b0..d5fe44a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -18,13 +18,13 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -47,7 +47,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private Sensor rangeTime;
     private Sensor flushTime;
     private Sensor restoreTime;
-    private StreamingMetrics metrics;
+    private StreamsMetrics metrics;
 
     private boolean loggingEnabled = true;
     private StoreChangeLogger<K, V> changeLogger = null;
@@ -141,14 +141,14 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public void putAll(List<Entry<K, V>> entries) {
+    public void putAll(List<KeyValue<K, V>> entries) {
         long startNs = time.nanoseconds();
         try {
             this.inner.putAll(entries);
 
             if (loggingEnabled) {
-                for (Entry<K, V> entry : entries) {
-                    K key = entry.key();
+                for (KeyValue<K, V> entry : entries) {
+                    K key = entry.key;
                     changeLogger.add(key);
                 }
                 changeLogger.maybeLogChange(this.getter);
@@ -231,7 +231,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         }
 
         @Override
-        public Entry<K1, V1> next() {
+        public KeyValue<K1, V1> next() {
             return iter.next();
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 821927d..862c322 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.state.Serdes;
@@ -40,7 +40,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
     private Sensor rangeTime;
     private Sensor flushTime;
     private Sensor restoreTime;
-    private StreamingMetrics metrics;
+    private StreamsMetrics metrics;
 
     private boolean loggingEnabled = true;
     private StoreChangeLogger<byte[], byte[]> changeLogger = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8a600f9..6c77ab2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -18,8 +18,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -147,9 +147,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public void putAll(List<Entry<K, V>> entries) {
-        for (Entry<K, V> entry : entries)
-            put(entry.key(), entry.value());
+    public void putAll(List<KeyValue<K, V>> entries) {
+        for (KeyValue<K, V> entry : entries)
+            put(entry.key, entry.value);
     }
 
     @Override
@@ -200,8 +200,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             return iter.key();
         }
 
-        protected Entry<K, V> getEntry() {
-            return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+        protected KeyValue<K, V> getKeyValue() {
+            return new KeyValue<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
         }
 
         @Override
@@ -210,11 +210,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
 
         @Override
-        public Entry<K, V> next() {
+        public KeyValue<K, V> next() {
             if (!hasNext())
                 throw new NoSuchElementException();
 
-            Entry<K, V> entry = this.getEntry();
+            KeyValue<K, V> entry = this.getKeyValue();
             iter.next();
             return entry;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 933ed91..d854c92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -20,9 +20,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.Serdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -86,10 +85,10 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             if (index >= iterators.length)
                 throw new NoSuchElementException();
 
-            Entry<byte[], byte[]> entry = iterators[index].next();
+            KeyValue<byte[], byte[]> kv = iterators[index].next();
 
-            return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()),
-                                  serdes.valueFrom(entry.value()));
+            return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(kv.key),
+                                  serdes.valueFrom(kv.value));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
deleted file mode 100644
index 3b3fc9b..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Map;
-import java.util.Properties;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-
-
-public class StreamingConfigTest {
-
-    private Properties props = new Properties();
-    private StreamingConfig streamingConfig;
-    private StreamThread streamThreadPlaceHolder;
-
-
-    @Before
-    public void setUp() {
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        streamingConfig = new StreamingConfig(props);
-    }
-
-    @Test
-    public void testGetProducerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamingConfig.getProducerConfigs("client");
-        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer");
-    }
-
-    @Test
-    public void testGetConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
-        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
-        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
-
-    }
-
-    @Test
-    public void testGetRestoreConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamingConfig.getRestoreConsumerConfigs("client");
-        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
-        assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
new file mode 100644
index 0000000..777fae5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Properties;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+
+public class StreamsConfigTest {
+
+    private Properties props = new Properties();
+    private StreamsConfig streamsConfig;
+    private StreamThread streamThreadPlaceHolder;
+
+
+    @Before
+    public void setUp() {
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-config-test");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        streamsConfig = new StreamsConfig(props);
+    }
+
+    @Test
+    public void testGetProducerConfigs() throws Exception {
+        Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client");
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer");
+    }
+
+    @Test
+    public void testGetConsumerConfigs() throws Exception {
+        Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
+        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
+
+    }
+
+    @Test
+    public void testGetRestoreConsumerConfigs() throws Exception {
+        Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client");
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
+        assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index a55fd30..693f58e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 880adce..f226cee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 0f7cb6a..73c517b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 0b7b1e7..426259f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index aa09e74..12bfb9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 1527f17..e3cf22b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 67b83f5..feabc08 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
deleted file mode 100644
index 1b8cbb8..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-public class WindowedStreamPartitionerTest {
-
-    private String topicName = "topic";
-
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
-
-    private List<PartitionInfo> infos = Arrays.asList(
-            new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
-    );
-
-    private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
-    @Test
-    public void testCopartitioning() {
-
-        Random rand = new Random();
-
-        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
-
-        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
-        WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
-
-        for (int k = 0; k < 10; k++) {
-            Integer key = rand.nextInt();
-            byte[] keyBytes = keySerializer.serialize(topicName, key);
-
-            String value = key.toString();
-            byte[] valueBytes = valSerializer.serialize(topicName, value);
-
-            Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
-
-            for (int w = 0; w < 10; w++) {
-                HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
-
-                Windowed<Integer> windowedKey = new Windowed<>(key, window);
-                Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
-
-                assertEquals(expected, actual);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
new file mode 100644
index 0000000..18494fd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowedStreamsPartitionerTest {
+
+    private String topicName = "topic";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+
+    private List<PartitionInfo> infos = Arrays.asList(
+            new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
+    );
+
+    private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+
+    @Test
+    public void testCopartitioning() {
+
+        Random rand = new Random();
+
+        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
+
+        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
+        WindowedStreamsPartitioner<Integer, String> streamPartitioner = new WindowedStreamsPartitioner<>(windowedSerializer);
+
+        for (int k = 0; k < 10; k++) {
+            Integer key = rand.nextInt();
+            byte[] keyBytes = keySerializer.serialize(topicName, key);
+
+            String value = key.toString();
+            byte[] valueBytes = valSerializer.serialize(topicName, value);
+
+            Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
+
+            for (int w = 0; w < 10; w++) {
+                HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
+
+                Windowed<Integer> windowedKey = new Windowed<>(key, window);
+                Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
+
+                assertEquals(expected, actual);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
deleted file mode 100644
index 92d7b6a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
-import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-
-public class KafkaStreamingPartitionAssignorTest {
-
-    private TopicPartition t1p0 = new TopicPartition("topic1", 0);
-    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
-    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
-    private TopicPartition t2p0 = new TopicPartition("topic2", 0);
-    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
-    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
-    private TopicPartition t3p0 = new TopicPartition("topic3", 0);
-    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
-    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
-    private TopicPartition t3p3 = new TopicPartition("topic3", 3);
-
-    private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
-
-    private List<PartitionInfo> infos = Arrays.asList(
-            new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
-    );
-
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
-    private final TaskId task0 = new TaskId(0, 0);
-    private final TaskId task1 = new TaskId(0, 1);
-    private final TaskId task2 = new TaskId(0, 2);
-    private final TaskId task3 = new TaskId(0, 3);
-
-    private Properties configProps() {
-        return new Properties() {
-            {
-                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-            }
-        };
-    }
-
-    private ByteArraySerializer serializer = new ByteArraySerializer();
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testSubscription() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-
-        final Set<TaskId> prevTasks = Utils.mkSet(
-                new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1));
-        final Set<TaskId> cachedTasks = Utils.mkSet(
-                new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
-                new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
-
-        String clientId = "client-id";
-        UUID processId = UUID.randomUUID();
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
-            @Override
-            public Set<TaskId> prevTasks() {
-                return prevTasks;
-            }
-            @Override
-            public Set<TaskId> cachedTasks() {
-                return cachedTasks;
-            }
-        };
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
-
-        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
-
-        Collections.sort(subscription.topics());
-        assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics());
-
-        Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
-        standbyTasks.removeAll(prevTasks);
-
-        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
-        assertEquals(info.encode(), subscription.userData());
-    }
-
-    @Test
-    public void testAssignBasic() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-        List<String> topics = Utils.mkList("topic1", "topic2");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
-
-        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
-        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
-        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
-        final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
-        final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
-        final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
-
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
-        String client2 = "client2";
-
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
-        subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
-        subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
-
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
-        // check assigned partitions
-        assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
-                Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
-        assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
-
-        // check assignment info
-
-        Set<TaskId> allActiveTasks = new HashSet<>();
-
-        // the first consumer
-        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
-
-        // the second consumer
-        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
-        allActiveTasks.addAll(info11.activeTasks);
-
-        assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
-
-        // the third consumer
-        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
-        allActiveTasks.addAll(info20.activeTasks);
-
-        assertEquals(3, allActiveTasks.size());
-        assertEquals(allTasks, new HashSet<>(allActiveTasks));
-
-        assertEquals(3, allActiveTasks.size());
-        assertEquals(allTasks, allActiveTasks);
-    }
-
-    @Test
-    public void testAssignWithNewTasks() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addSource("source3", "topic3");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
-        List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
-
-        // assuming that previous tasks do not have topic3
-        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
-        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
-        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
-
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
-        String client2 = "client2";
-
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
-        subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
-        subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
-
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
-        // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
-        // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
-        // then later ones will be re-assigned to other hosts due to load balancing
-        Set<TaskId> allActiveTasks = new HashSet<>();
-        Set<TopicPartition> allPartitions = new HashSet<>();
-        AssignmentInfo info;
-
-        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        allActiveTasks.addAll(info.activeTasks);
-        allPartitions.addAll(assignments.get("consumer10").partitions());
-
-        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
-        allActiveTasks.addAll(info.activeTasks);
-        allPartitions.addAll(assignments.get("consumer11").partitions());
-
-        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
-        allActiveTasks.addAll(info.activeTasks);
-        allPartitions.addAll(assignments.get("consumer20").partitions());
-
-        assertEquals(allTasks, allActiveTasks);
-        assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions);
-    }
-
-    @Test
-    public void testAssignWithStates() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
-        builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
-
-        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
-        builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
-        builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
-
-        List<String> topics = Utils.mkList("topic1", "topic2");
-
-        TaskId task00 = new TaskId(0, 0);
-        TaskId task01 = new TaskId(0, 1);
-        TaskId task02 = new TaskId(0, 2);
-        TaskId task10 = new TaskId(1, 0);
-        TaskId task11 = new TaskId(1, 1);
-        TaskId task12 = new TaskId(1, 2);
-
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
-        String client2 = "client2";
-
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
-        subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
-        subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
-
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
-        // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
-        assertEquals(2, assignments.get("consumer10").partitions().size());
-        assertEquals(2, assignments.get("consumer11").partitions().size());
-        assertEquals(2, assignments.get("consumer20").partitions().size());
-
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
-
-        // check tasks for state topics
-        assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
-        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
-        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
-    }
-
-    @Test
-    public void testAssignWithStandbyReplicas() throws Exception {
-        Properties props = configProps();
-        props.setProperty(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
-        StreamingConfig config = new StreamingConfig(props);
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-        List<String> topics = Utils.mkList("topic1", "topic2");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
-
-
-        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
-        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
-        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
-        final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
-        final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
-        final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
-
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
-        String client2 = "client2";
-
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
-        subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
-        subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
-
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
-        Set<TaskId> allActiveTasks = new HashSet<>();
-        Set<TaskId> allStandbyTasks = new HashSet<>();
-
-        // the first consumer
-        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
-        allStandbyTasks.addAll(info10.standbyTasks.keySet());
-
-        // the second consumer
-        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
-        allActiveTasks.addAll(info11.activeTasks);
-        allStandbyTasks.addAll(info11.standbyTasks.keySet());
-
-        // check active tasks assigned to the first client
-        assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
-        assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
-
-        // the third consumer
-        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
-        allActiveTasks.addAll(info20.activeTasks);
-        allStandbyTasks.addAll(info20.standbyTasks.keySet());
-
-        // all task ids are in the active tasks and also in the standby tasks
-
-        assertEquals(3, allActiveTasks.size());
-        assertEquals(allTasks, allActiveTasks);
-
-        assertEquals(3, allStandbyTasks.size());
-        assertEquals(allTasks, allStandbyTasks);
-    }
-
-    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
-
-        // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
-
-        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
-
-        // check if the number of assigned partitions == the size of active task id list
-        assertEquals(assignment.partitions().size(), info.activeTasks.size());
-
-        // check if active tasks are consistent
-        List<TaskId> activeTasks = new ArrayList<>();
-        Set<String> activeTopics = new HashSet<>();
-        for (TopicPartition partition : assignment.partitions()) {
-            // since default grouper, taskid.partition == partition.partition()
-            activeTasks.add(new TaskId(0, partition.partition()));
-            activeTopics.add(partition.topic());
-        }
-        assertEquals(activeTasks, info.activeTasks);
-
-        // check if active partitions cover all topics
-        assertEquals(allTopics, activeTopics);
-
-        // check if standby tasks are consistent
-        Set<String> standbyTopics = new HashSet<>();
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
-            TaskId id = entry.getKey();
-            Set<TopicPartition> partitions = entry.getValue();
-            for (TopicPartition partition : partitions) {
-                // since default grouper, taskid.partition == partition.partition()
-                assertEquals(id.partition, partition.partition());
-
-                standbyTopics.add(partition.topic());
-            }
-        }
-
-        if (info.standbyTasks.size() > 0)
-            // check if standby partitions cover all topics
-            assertEquals(allTopics, standbyTopics);
-
-        return info;
-    }
-
-    @Test
-    public void testOnAssignment() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopicPartition t2p3 = new TopicPartition("topic2", 3);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-
-        UUID uuid = UUID.randomUUID();
-        String client1 = "client1";
-
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime());
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
-
-        List<TaskId> activeTaskList = Utils.mkList(task0, task3);
-        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-        standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
-        standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
-
-        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
-        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
-        partitionAssignor.onAssignment(assignment);
-
-        assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
-        assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
-        assertEquals(standbyTasks, partitionAssignor.standbyTasks());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index f2ef2ea..60bd309 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -27,12 +27,12 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -60,21 +60,22 @@ public class ProcessorTopologyTest {
     private static long timestamp = 1000L;
 
     private ProcessorTopologyTestDriver driver;
-    private StreamingConfig config;
+    private StreamsConfig config;
 
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
         File localState = StateUtils.tempDir();
         Properties props = new Properties();
-        props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamingConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
-        props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
-        props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        this.config = new StreamingConfig(props);
+        props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
+        props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+        props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        this.config = new StreamsConfig(props);
     }
 
     @After
@@ -193,8 +194,8 @@ public class ProcessorTopologyTest {
         assertNull(driver.readOutput(topic));
     }
 
-    protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer partition) {
-        return new StreamPartitioner<K, V>() {
+    protected <K, V> StreamsPartitioner<K, V> constantPartitioner(final Integer partition) {
+        return new StreamsPartitioner<K, V>() {
             @Override
             public Integer partition(K key, V value, int numPartitions) {
                 return partition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 85a8a15..fd604b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -85,17 +85,18 @@ public class StandbyTaskTest {
             )
     );
 
-    private StreamingConfig createConfig(final File baseDir) throws Exception {
-        return new StreamingConfig(new Properties() {
+    private StreamsConfig createConfig(final File baseDir) throws Exception {
+        return new StreamsConfig(new Properties() {
             {
-                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, "standby-task-test");
+                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
             }
         });
     }
@@ -130,7 +131,7 @@ public class StandbyTaskTest {
     public void testStorePartitions() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
@@ -145,7 +146,7 @@ public class StandbyTaskTest {
     public void testUpdateNonPersistentStore() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@@ -164,7 +165,7 @@ public class StandbyTaskTest {
     public void testUpdate() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@@ -227,7 +228,7 @@ public class StandbyTaskTest {
                     new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0])
             ));
 
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StandbyTask task = new StandbyTask(taskId, jobId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));


Mime
View raw message