kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [1/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to KafkaStreams
Date Fri, 22 Jan 2016 21:00:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 91ba074e4 -> 21c6cfe50


http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
new file mode 100644
index 0000000..15b114a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamPartitionAssignorTest {
+
+    private TopicPartition t1p0 = new TopicPartition("topic1", 0);
+    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
+    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
+    private TopicPartition t2p0 = new TopicPartition("topic2", 0);
+    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
+    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
+    private TopicPartition t3p0 = new TopicPartition("topic3", 0);
+    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
+    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
+    private TopicPartition t3p3 = new TopicPartition("topic3", 3);
+
+    private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
+
+    private List<PartitionInfo> infos = Arrays.asList(
+            new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
+    );
+
+    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+
+    private final TaskId task0 = new TaskId(0, 0);
+    private final TaskId task1 = new TaskId(0, 1);
+    private final TaskId task2 = new TaskId(0, 2);
+    private final TaskId task3 = new TaskId(0, 3);
+
+    private Properties configProps() {
+        return new Properties() {
+            {
+                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-partition-assignor-test");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+            }
+        };
+    }
+
+    private ByteArraySerializer serializer = new ByteArraySerializer();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSubscription() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+
+        final Set<TaskId> prevTasks = Utils.mkSet(
+                new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1));
+        final Set<TaskId> cachedTasks = Utils.mkSet(
+                new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
+                new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
+
+        String clientId = "client-id";
+        UUID processId = UUID.randomUUID();
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
+            @Override
+            public Set<TaskId> prevTasks() {
+                return prevTasks;
+            }
+            @Override
+            public Set<TaskId> cachedTasks() {
+                return cachedTasks;
+            }
+        };
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
+
+        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
+
+        Collections.sort(subscription.topics());
+        assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics());
+
+        Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
+        standbyTasks.removeAll(prevTasks);
+
+        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
+        assertEquals(info.encode(), subscription.userData());
+    }
+
+    @Test
+    public void testAssignBasic() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+        List<String> topics = Utils.mkList("topic1", "topic2");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+        final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
+        final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
+        final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check assigned partitions
+        assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
+                Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
+        assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
+
+        // check assignment info
+
+        Set<TaskId> allActiveTasks = new HashSet<>();
+
+        // the first consumer
+        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
+
+        // the second consumer
+        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+        allActiveTasks.addAll(info11.activeTasks);
+
+        assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
+
+        // the third consumer
+        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+        allActiveTasks.addAll(info20.activeTasks);
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, new HashSet<>(allActiveTasks));
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, allActiveTasks);
+    }
+
+    @Test
+    public void testAssignWithNewTasks() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addSource("source3", "topic3");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
+        List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
+
+        // assuming that previous tasks do not have topic3
+        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
+        // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
+        // then later ones will be re-assigned to other hosts due to load balancing
+        Set<TaskId> allActiveTasks = new HashSet<>();
+        Set<TopicPartition> allPartitions = new HashSet<>();
+        AssignmentInfo info;
+
+        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        allActiveTasks.addAll(info.activeTasks);
+        allPartitions.addAll(assignments.get("consumer10").partitions());
+
+        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        allActiveTasks.addAll(info.activeTasks);
+        allPartitions.addAll(assignments.get("consumer11").partitions());
+
+        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        allActiveTasks.addAll(info.activeTasks);
+        allPartitions.addAll(assignments.get("consumer20").partitions());
+
+        assertEquals(allTasks, allActiveTasks);
+        assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions);
+    }
+
+    @Test
+    public void testAssignWithStates() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
+        builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
+
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
+        builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
+        builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
+
+        List<String> topics = Utils.mkList("topic1", "topic2");
+
+        TaskId task00 = new TaskId(0, 0);
+        TaskId task01 = new TaskId(0, 1);
+        TaskId task02 = new TaskId(0, 2);
+        TaskId task10 = new TaskId(1, 0);
+        TaskId task11 = new TaskId(1, 1);
+        TaskId task12 = new TaskId(1, 2);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
+        assertEquals(2, assignments.get("consumer10").partitions().size());
+        assertEquals(2, assignments.get("consumer11").partitions().size());
+        assertEquals(2, assignments.get("consumer20").partitions().size());
+
+        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
+        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
+        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
+
+        // check tasks for state topics
+        assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
+        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
+        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicas() throws Exception {
+        Properties props = configProps();
+        props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
+        StreamsConfig config = new StreamsConfig(props);
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+        List<String> topics = Utils.mkList("topic1", "topic2");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+
+        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+        final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
+        final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
+        final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        Set<TaskId> allActiveTasks = new HashSet<>();
+        Set<TaskId> allStandbyTasks = new HashSet<>();
+
+        // the first consumer
+        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
+        allStandbyTasks.addAll(info10.standbyTasks.keySet());
+
+        // the second consumer
+        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+        allActiveTasks.addAll(info11.activeTasks);
+        allStandbyTasks.addAll(info11.standbyTasks.keySet());
+
+        // check active tasks assigned to the first client
+        assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
+        assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
+
+        // the third consumer
+        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+        allActiveTasks.addAll(info20.activeTasks);
+        allStandbyTasks.addAll(info20.standbyTasks.keySet());
+
+        // all task ids are in the active tasks and also in the standby tasks
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, allActiveTasks);
+
+        assertEquals(3, allStandbyTasks.size());
+        assertEquals(allTasks, allStandbyTasks);
+    }
+
+    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+
+        // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
+
+        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+        // check if the number of assigned partitions == the size of active task id list
+        assertEquals(assignment.partitions().size(), info.activeTasks.size());
+
+        // check if active tasks are consistent
+        List<TaskId> activeTasks = new ArrayList<>();
+        Set<String> activeTopics = new HashSet<>();
+        for (TopicPartition partition : assignment.partitions()) {
+            // since default grouper, taskid.partition == partition.partition()
+            activeTasks.add(new TaskId(0, partition.partition()));
+            activeTopics.add(partition.topic());
+        }
+        assertEquals(activeTasks, info.activeTasks);
+
+        // check if active partitions cover all topics
+        assertEquals(allTopics, activeTopics);
+
+        // check if standby tasks are consistent
+        Set<String> standbyTopics = new HashSet<>();
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
+            TaskId id = entry.getKey();
+            Set<TopicPartition> partitions = entry.getValue();
+            for (TopicPartition partition : partitions) {
+                // since default grouper, taskid.partition == partition.partition()
+                assertEquals(id.partition, partition.partition());
+
+                standbyTopics.add(partition.topic());
+            }
+        }
+
+        if (info.standbyTasks.size() > 0)
+            // check if standby partitions cover all topics
+            assertEquals(allTopics, standbyTopics);
+
+        return info;
+    }
+
+    @Test
+    public void testOnAssignment() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopicPartition t2p3 = new TopicPartition("topic2", 3);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+
+        UUID uuid = UUID.randomUUID();
+        String client1 = "client1";
+
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
+
+        List<TaskId> activeTaskList = Utils.mkList(task0, task3);
+        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+        standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
+        standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
+
+        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
+        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
+        partitionAssignor.onAssignment(assignment);
+
+        assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
+        assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
+        assertEquals(standbyTasks, partitionAssignor.standbyTasks());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1847e85..bf3b3b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.MockSourceNode;
@@ -69,17 +69,18 @@ public class StreamTaskTest {
             Collections.<StateStoreSupplier>emptyList()
     );
 
-    private StreamingConfig createConfig(final File baseDir) throws Exception {
-        return new StreamingConfig(new Properties() {
+    private StreamsConfig createConfig(final File baseDir) throws Exception {
+        return new StreamsConfig(new Properties() {
             {
-                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-task-test");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
             }
         });
     }
@@ -102,7 +103,7 @@ public class StreamTaskTest {
     public void testProcessOrder() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
@@ -153,7 +154,7 @@ public class StreamTaskTest {
     public void testPauseResume() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5f0347d..2d531bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -37,7 +37,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -112,13 +112,14 @@ public class StreamThreadTest {
     private Properties configProps() {
         return new Properties() {
             {
-                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-thread-test");
+                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
             }
         };
     }
@@ -132,7 +133,7 @@ public class StreamThreadTest {
                               Consumer<byte[], byte[]> consumer,
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
-                              StreamingConfig config) {
+                              StreamsConfig config) {
             super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null);
         }
 
@@ -148,7 +149,7 @@ public class StreamThreadTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testPartitionAssignmentChange() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
+        StreamsConfig config = new StreamsConfig(configProps());
 
         MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
         MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -258,10 +259,10 @@ public class StreamThreadTest {
         try {
             final long cleanupDelay = 1000L;
             Properties props = configProps();
-            props.setProperty(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
-            props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+            props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
+            props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
 
-            StreamingConfig config = new StreamingConfig(props);
+            StreamsConfig config = new StreamsConfig(props);
 
             File stateDir1 = new File(baseDir, task1.toString());
             File stateDir2 = new File(baseDir, task2.toString());
@@ -389,10 +390,10 @@ public class StreamThreadTest {
         try {
             final long commitInterval = 1000L;
             Properties props = configProps();
-            props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-            props.setProperty(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
+            props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+            props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
 
-            StreamingConfig config = new StreamingConfig(props);
+            StreamsConfig config = new StreamsConfig(props);
 
             MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
             MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -468,9 +469,9 @@ public class StreamThreadTest {
         }
     }
 
-    private void initPartitionGrouper(StreamingConfig config, StreamThread thread) {
+    private void initPartitionGrouper(StreamsConfig config, StreamThread thread) {
 
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
 
         partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index b0c9bd7..36e487b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -23,12 +23,13 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.test.MockProcessorContext;
@@ -221,10 +222,10 @@ public class KeyValueStoreTestDriver<K, V> {
     private final Serdes<K, V> serdes;
     private final Map<K, V> flushedEntries = new HashMap<>();
     private final Set<K> flushedRemovals = new HashSet<>();
-    private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
+    private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>();
     private final MockProcessorContext context;
     private final Map<String, StateStore> storeMap = new HashMap<>();
-    private final StreamingMetrics metrics = new StreamingMetrics() {
+    private final StreamsMetrics metrics = new StreamsMetrics() {
         @Override
         public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
             return null;
@@ -248,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> {
             }
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
-                                    StreamPartitioner<K1, V1> partitioner) {
+                                    StreamsPartitioner<K1, V1> partitioner) {
                 recordFlushed(record.key(), record.value());
             }
         };
@@ -256,12 +257,12 @@ public class KeyValueStoreTestDriver<K, V> {
         this.stateDir.mkdirs();
 
         Properties props = new Properties();
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
 
         this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
                 serdes.valueDeserializer(), recordCollector) {
@@ -287,7 +288,7 @@ public class KeyValueStoreTestDriver<K, V> {
             }
 
             @Override
-            public StreamingMetrics metrics() {
+            public StreamsMetrics metrics() {
                 return metrics;
             }
 
@@ -313,10 +314,10 @@ public class KeyValueStoreTestDriver<K, V> {
     }
 
     private void restoreEntries(StateRestoreCallback func) {
-        for (Entry<K, V> entry : restorableEntries) {
+        for (KeyValue<K, V> entry : restorableEntries) {
             if (entry != null) {
-                byte[] rawKey = serdes.rawKey(entry.key());
-                byte[] rawValue = serdes.rawValue(entry.value());
+                byte[] rawKey = serdes.rawKey(entry.key);
+                byte[] rawValue = serdes.rawValue(entry.value);
                 func.restore(rawKey, rawValue);
             }
         }
@@ -352,7 +353,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * @see #checkForRestoredEntries(KeyValueStore)
      */
     public void addEntryToRestoreLog(K key, V value) {
-        restorableEntries.add(new Entry<K, V>(key, value));
+        restorableEntries.add(new KeyValue<K, V>(key, value));
     }
 
     /**
@@ -376,7 +377,7 @@ public class KeyValueStoreTestDriver<K, V> {
      *
      * @return the restore entries; never null but possibly a null iterator
      */
-    public Iterable<Entry<K, V>> restoredEntries() {
+    public Iterable<KeyValue<K, V>> restoredEntries() {
         return restorableEntries;
     }
 
@@ -390,10 +391,10 @@ public class KeyValueStoreTestDriver<K, V> {
      */
     public int checkForRestoredEntries(KeyValueStore<K, V> store) {
         int missing = 0;
-        for (Entry<K, V> entry : restorableEntries) {
-            if (entry != null) {
-                V value = store.get(entry.key());
-                if (!Objects.equals(value, entry.value())) {
+        for (KeyValue<K, V> kv : restorableEntries) {
+            if (kv != null) {
+                V value = store.get(kv.key);
+                if (!Objects.equals(value, kv.value)) {
                     ++missing;
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 2ed698c..8effd77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -21,8 +21,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
@@ -73,11 +73,11 @@ public abstract class AbstractKeyValueStoreTest {
             // Check range iteration ...
             try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
                 while (iter.hasNext()) {
-                    Entry<Integer, String> entry = iter.next();
-                    if (entry.key().equals(2))
-                        assertEquals("two", entry.value());
-                    else if (entry.key().equals(4))
-                        assertEquals("four", entry.value());
+                    KeyValue<Integer, String> entry = iter.next();
+                    if (entry.key.equals(2))
+                        assertEquals("two", entry.value);
+                    else if (entry.key.equals(4))
+                        assertEquals("four", entry.value);
                     else
                         fail("Unexpected entry: " + entry);
                 }
@@ -86,11 +86,11 @@ public abstract class AbstractKeyValueStoreTest {
             // Check range iteration ...
             try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
                 while (iter.hasNext()) {
-                    Entry<Integer, String> entry = iter.next();
-                    if (entry.key().equals(2))
-                        assertEquals("two", entry.value());
-                    else if (entry.key().equals(4))
-                        assertEquals("four", entry.value());
+                    KeyValue<Integer, String> entry = iter.next();
+                    if (entry.key.equals(2))
+                        assertEquals("two", entry.value);
+                    else if (entry.key.equals(4))
+                        assertEquals("four", entry.value);
                     else
                         fail("Unexpected entry: " + entry);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 80ad67f..45448e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -24,10 +24,10 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.Serdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -69,12 +69,12 @@ public class RocksDBWindowStoreTest {
     public void testPutAndFetch() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -165,12 +165,12 @@ public class RocksDBWindowStoreTest {
     public void testPutAndFetchBefore() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -261,12 +261,12 @@ public class RocksDBWindowStoreTest {
     public void testPutAndFetchAfter() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -357,12 +357,12 @@ public class RocksDBWindowStoreTest {
     public void testPutSameKeyTimestamp() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -416,12 +416,12 @@ public class RocksDBWindowStoreTest {
     public void testRolling() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -528,7 +528,7 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void testRestore() throws IOException {
-        final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+        final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
         long startTime = segmentSize * 2;
         long incr = segmentSize / 2;
 
@@ -538,7 +538,7 @@ public class RocksDBWindowStoreTest {
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -587,7 +587,7 @@ public class RocksDBWindowStoreTest {
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -655,13 +655,13 @@ public class RocksDBWindowStoreTest {
         return set;
     }
 
-    private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> changeLog, long startTime) {
+    private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {
         HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
 
-        for (Entry<byte[], byte[]> entry : changeLog) {
-            long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key());
-            Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes);
-            String value = entry.value() == null ? null : serdes.valueFrom(entry.value());
+        for (KeyValue<byte[], byte[]> entry : changeLog) {
+            long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key);
+            Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key, serdes);
+            String value = entry.value == null ? null : serdes.valueFrom(entry.value);
 
             Set<String> entries = entriesByKey.get(key);
             if (entries == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2dc567e..8f8e00f 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -130,7 +130,7 @@ public class KStreamTestDriver {
         
         @Override
         public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                                StreamPartitioner<K, V> partitioner) {
+                                StreamsPartitioner<K, V> partitioner) {
             // The serialization is skipped.
             process(record.topic(), record.key(), record.value());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index a6a29cd..cb7a95c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -18,7 +18,8 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -26,7 +27,6 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Entry;
 
 import java.io.File;
 import java.util.Collections;
@@ -123,8 +123,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public StreamingMetrics metrics() {
-        return new StreamingMetrics() {
+    public StreamsMetrics metrics() {
+        return new StreamsMetrics() {
             @Override
             public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
                 return null;
@@ -192,10 +192,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         return Collections.unmodifiableMap(storeMap);
     }
 
-    public void restore(String storeName, List<Entry<byte[], byte[]>> changeLog) {
+    public void restore(String storeName, List<KeyValue<byte[], byte[]>> changeLog) {
         StateRestoreCallback restoreCallback = restoreFuncs.get(storeName);
-        for (Entry<byte[], byte[]> entry : changeLog) {
-            restoreCallback.restore(entry.key(), entry.value());
+        for (KeyValue<byte[], byte[]> entry : changeLog) {
+            restoreCallback.restore(entry.key, entry.value);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
index 98aa0d4..828b5ae 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.test;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 
 public class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index eaeed09..af6d51b 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -27,8 +27,8 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * can use and test code you already have that uses a builder to create topologies. Best of all, the class works without a real
  * Kafka broker, so the tests execute very quickly with very little overhead.
  * <p>
- * Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamingConfig} and a
+ * Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamsConfig} and a
  * TopologyBuilder, use the driver to supply an input message to the topology, and then use the driver to read and verify any
  * messages output by the topology.
  * <p>
@@ -65,7 +65,7 @@ import java.util.concurrent.atomic.AtomicLong;
  *
  * <h2>Driver setup</h2>
  * <p>
- * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamingConfig}. The
+ * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamsConfig}. The
  * configuration needs to be representative of what you'd supply to the real topology, so that means including several key
  * properties. For example, the following code fragment creates a configuration that specifies a local Kafka broker list
  * (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
@@ -74,13 +74,13 @@ import java.util.concurrent.atomic.AtomicLong;
  * StringSerializer strSerializer = new StringSerializer();
  * StringDeserializer strDeserializer = new StringDeserializer();
  * Properties props = new Properties();
- * props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- * props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
- * props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
- * props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
- * props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
- * props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
- * StreamingConfig config = new StreamingConfig(props);
+ * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ * props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+ * props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
+ * props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
+ * props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
+ * props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
+ * StreamsConfig config = new StreamsConfig(props);
  * TopologyBuilder builder = ...
  * ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
  * </pre>
@@ -139,11 +139,11 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Create a new test driver instance.
-     * @param config the streaming configuration for the topology
+     * @param config the stream configuration for the topology
      * @param builder the topology builder that will be used to create the topology instance
      * @param storeNames the optional names of the state stores that are used by the topology
      */
-    public ProcessorTopologyTestDriver(StreamingConfig config, TopologyBuilder builder, String... storeNames) {
+    public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) {
         id = new TaskId(0, 0);
         topology = builder.build(null);
 
@@ -173,7 +173,7 @@ public class ProcessorTopologyTestDriver {
             producer,
             restoreStateConsumer,
             config,
-            new StreamingMetrics() {
+            new StreamsMetrics() {
                 @Override
                 public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
                     return null;
@@ -265,7 +265,7 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Get the {@link StateStore} with the given name. The name should have been supplied via
-     * {@link #ProcessorTopologyTestDriver(StreamingConfig, TopologyBuilder, String...) this object's constructor}, and is
+     * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is
      * presumed to be used by a Processor within the topology.
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
@@ -281,7 +281,7 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Get the {@link KeyValueStore} with the given name. The name should have been supplied via
-     * {@link #ProcessorTopologyTestDriver(StreamingConfig, TopologyBuilder, String...) this object's constructor}, and is
+     * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is
      * presumed to be used by a Processor within the topology.
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to


Mime
View raw message