kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [1/2] kafka git commit: KAFKA-2804: manage changelog topics through ZK in PartitionAssignor
Date Mon, 07 Dec 2015 23:12:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 23f36c590 -> d05fa0a03


http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
index a491e4a..3b3fc9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -37,13 +38,11 @@ public class StreamingConfigTest {
 
     private Properties props = new Properties();
     private StreamingConfig streamingConfig;
-    private StreamThread streamThreadPlaceHolder = null;
+    private StreamThread streamThreadPlaceHolder;
 
 
     @Before
     public void setUp() {
-        props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
-        props.put("group.id", "test-consumer-group");
         props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@@ -53,18 +52,24 @@ public class StreamingConfigTest {
         streamingConfig = new StreamingConfig(props);
     }
 
-
+    @Test
+    public void testGetProducerConfigs() throws Exception {
+        Map<String, Object> returnedProps = streamingConfig.getProducerConfigs("client");
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer");
+    }
 
     @Test
     public void testGetConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder);
-        assertEquals(returnedProps.get("group.id"), "test-consumer-group");
+        Map<String, Object> returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
+        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
 
     }
 
     @Test
     public void testGetRestoreConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamingConfig.getRestoreConsumerConfigs();
-        assertNull(returnedProps.get("group.id"));
+        Map<String, Object> returnedProps = streamingConfig.getRestoreConsumerConfigs("client");
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
+        assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index 2a5ca9b..c11d0c1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import org.junit.Test;
 
@@ -43,42 +44,39 @@ public class DefaultPartitionGrouperTest {
             new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+    private Cluster metadata = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet());
 
     @Test
     public void testGrouping() {
         PartitionGrouper grouper = new DefaultPartitionGrouper();
         int topicGroupId;
-        Map<TaskId, Set<TopicPartition>> expected;
+        Map<TaskId, Set<TopicPartition>> expectedPartitionsForTask;
         Map<Integer, Set<String>> topicGroups;
 
         topicGroups = new HashMap<>();
         topicGroups.put(0, mkSet("topic1"));
         topicGroups.put(1, mkSet("topic2"));
-        grouper.topicGroups(topicGroups);
 
-        expected = new HashMap<>();
+        expectedPartitionsForTask = new HashMap<>();
         topicGroupId = 0;
-        expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0)));
-        expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1)));
-        expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
+        expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0)));
+        expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1)));
+        expectedPartitionsForTask.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
         topicGroupId++;
-        expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0)));
-        expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1)));
+        expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0)));
+        expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1)));
 
-        assertEquals(expected, grouper.partitionGroups(metadata));
+        assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata));
 
         topicGroups = new HashMap<>();
         topicGroups.put(0, mkSet("topic1", "topic2"));
-        grouper.topicGroups(topicGroups);
 
-        expected = new HashMap<>();
+        expectedPartitionsForTask = new HashMap<>();
         topicGroupId = 0;
-        expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
-        expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
-        expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
+        expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
+        expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
+        expectedPartitionsForTask.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
 
-        assertEquals(expected, grouper.partitionGroups(metadata));
+        assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata));
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index f6924ad..af0b3c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -19,11 +19,13 @@ package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -208,12 +210,12 @@ public class TopologyBuilderTest {
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
 
-        Map<Integer, Set<String>> topicGroups = builder.topicGroups();
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
-        Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
-        expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
-        expectedTopicGroups.put(2, mkSet("topic-5"));
+        Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
+        expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet()));
+        expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet()));
+        expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet()));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -235,18 +237,23 @@ public class TopologyBuilderTest {
 
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
         builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
-        builder.addStateStore(new MockStateStoreSupplier("strore-1", false), "processor-1", "processor-2");
+        builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2");
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
         builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
-        builder.addStateStore(new MockStateStoreSupplier("strore-2", false), "processor-3", "processor-4");
+        builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4");
 
-        Map<Integer, Set<String>> topicGroups = builder.topicGroups();
+        builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
+        StateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false);
+        builder.addStateStore(supplier);
+        builder.connectProcessorAndStateStores("processor-5", "store-3");
+
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
-        Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
-        expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
-        expectedTopicGroups.put(2, mkSet("topic-5"));
+        Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
+        expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), mkSet("store-1")));
+        expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), mkSet("store-2")));
+        expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), mkSet("store-3")));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
index 43ffa7b..92d7b6a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
@@ -35,9 +35,9 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -59,7 +59,10 @@ public class KafkaStreamingPartitionAssignorTest {
     private TopicPartition t2p0 = new TopicPartition("topic2", 0);
     private TopicPartition t2p1 = new TopicPartition("topic2", 1);
     private TopicPartition t2p2 = new TopicPartition("topic2", 2);
-    private TopicPartition t2p3 = new TopicPartition("topic2", 3);
+    private TopicPartition t3p0 = new TopicPartition("topic3", 0);
+    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
+    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
+    private TopicPartition t3p3 = new TopicPartition("topic3", 3);
 
     private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
 
@@ -69,27 +72,15 @@ public class KafkaStreamingPartitionAssignorTest {
             new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0])
+            new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
     );
 
     private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
 
-    private ByteBuffer subscriptionUserData() {
-        UUID uuid = UUID.randomUUID();
-        ByteBuffer buf = ByteBuffer.allocate(4 + 16 + 4 + 4);
-        // version
-        buf.putInt(1);
-        // encode client clientUUID
-        buf.putLong(uuid.getMostSignificantBits());
-        buf.putLong(uuid.getLeastSignificantBits());
-        // previously running tasks
-        buf.putInt(0);
-        // cached tasks
-        buf.putInt(0);
-        buf.rewind();
-        return buf;
-    }
-
     private final TaskId task0 = new TaskId(0, 0);
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
@@ -131,8 +122,9 @@ public class KafkaStreamingPartitionAssignorTest {
                 new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
                 new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
 
-        UUID uuid = UUID.randomUUID();
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime()) {
+        String clientId = "client-id";
+        UUID processId = UUID.randomUUID();
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
             @Override
             public Set<TaskId> prevTasks() {
                 return prevTasks;
@@ -144,7 +136,7 @@ public class KafkaStreamingPartitionAssignorTest {
         };
 
         KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread));
+        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
 
         PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
 
@@ -154,12 +146,12 @@ public class KafkaStreamingPartitionAssignorTest {
         Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
         standbyTasks.removeAll(prevTasks);
 
-        SubscriptionInfo info = new SubscriptionInfo(uuid, prevTasks, standbyTasks);
+        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
         assertEquals(info.encode(), subscription.userData());
     }
 
     @Test
-    public void testAssign() throws Exception {
+    public void testAssignBasic() throws Exception {
         StreamingConfig config = new StreamingConfig(configProps());
 
         MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
@@ -182,11 +174,13 @@ public class KafkaStreamingPartitionAssignorTest {
 
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
 
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
 
         KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10));
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -199,7 +193,6 @@ public class KafkaStreamingPartitionAssignorTest {
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
         // check assigned partitions
-
         assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
                 Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
         assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
@@ -216,8 +209,7 @@ public class KafkaStreamingPartitionAssignorTest {
         AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
         allActiveTasks.addAll(info11.activeTasks);
 
-        // check active tasks assigned to the first client
-        assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
+        assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
 
         // the third consumer
         AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
@@ -227,7 +219,135 @@ public class KafkaStreamingPartitionAssignorTest {
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
 
         assertEquals(3, allActiveTasks.size());
-        assertEquals(allTasks, new HashSet<>(allActiveTasks));
+        assertEquals(allTasks, allActiveTasks);
+    }
+
+    @Test
+    public void testAssignWithNewTasks() throws Exception {
+        StreamingConfig config = new StreamingConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addSource("source3", "topic3");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
+        List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
+
+        // assuming that previous tasks do not have topic3
+        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
+        // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
+        // then later ones will be re-assigned to other hosts due to load balancing
+        Set<TaskId> allActiveTasks = new HashSet<>();
+        Set<TopicPartition> allPartitions = new HashSet<>();
+        AssignmentInfo info;
+
+        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        allActiveTasks.addAll(info.activeTasks);
+        allPartitions.addAll(assignments.get("consumer10").partitions());
+
+        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        allActiveTasks.addAll(info.activeTasks);
+        allPartitions.addAll(assignments.get("consumer11").partitions());
+
+        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        allActiveTasks.addAll(info.activeTasks);
+        allPartitions.addAll(assignments.get("consumer20").partitions());
+
+        assertEquals(allTasks, allActiveTasks);
+        assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions);
+    }
+
+    @Test
+    public void testAssignWithStates() throws Exception {
+        StreamingConfig config = new StreamingConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
+        builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
+
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
+        builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
+        builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
+
+        List<String> topics = Utils.mkList("topic1", "topic2");
+
+        TaskId task00 = new TaskId(0, 0);
+        TaskId task01 = new TaskId(0, 1);
+        TaskId task02 = new TaskId(0, 2);
+        TaskId task10 = new TaskId(1, 0);
+        TaskId task11 = new TaskId(1, 1);
+        TaskId task12 = new TaskId(1, 2);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
+        assertEquals(2, assignments.get("consumer10").partitions().size());
+        assertEquals(2, assignments.get("consumer11").partitions().size());
+        assertEquals(2, assignments.get("consumer20").partitions().size());
+
+        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
+        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
+        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
+
+        // check tasks for state topics
+        assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
+        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
+        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
     }
 
     @Test
@@ -257,11 +377,13 @@ public class KafkaStreamingPartitionAssignorTest {
 
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
 
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
 
         KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10));
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -298,10 +420,10 @@ public class KafkaStreamingPartitionAssignorTest {
         // all task ids are in the active tasks and also in the standby tasks
 
         assertEquals(3, allActiveTasks.size());
-        assertEquals(allTasks, new HashSet<>(allActiveTasks));
+        assertEquals(allTasks, allActiveTasks);
 
         assertEquals(3, allStandbyTasks.size());
-        assertEquals(allTasks, new HashSet<>(allStandbyTasks));
+        assertEquals(allTasks, allStandbyTasks);
     }
 
     private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
@@ -354,17 +476,20 @@ public class KafkaStreamingPartitionAssignorTest {
         MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
         MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
 
+        TopicPartition t2p3 = new TopicPartition("topic2", 3);
+
         TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
 
         UUID uuid = UUID.randomUUID();
+        String client1 = "client1";
 
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime());
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime());
 
         KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread));
+        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
 
         List<TaskId> activeTaskList = Utils.mkList(task0, task3);
         Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@@ -375,8 +500,8 @@ public class KafkaStreamingPartitionAssignorTest {
         PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
         partitionAssignor.onAssignment(assignment);
 
-        assertEquals(Utils.mkSet(task0), partitionAssignor.taskIds(t1p0));
-        assertEquals(Utils.mkSet(task3), partitionAssignor.taskIds(t2p3));
+        assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
+        assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
         assertEquals(standbyTasks, partitionAssignor.standbyTasks());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 17bc9da..7e5ce49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -178,14 +179,21 @@ public class ProcessorStateManagerTest {
         }
     }
 
+    private final String jobId = "test-job";
+    private final String stateDir = "test";
+    private final String persistentStoreName = "persistentStore";
+    private final String nonPersistentStoreName = "nonPersistentStore";
+    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, persistentStoreName);
+    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, nonPersistentStoreName);
+
     @Test
     public void testLockStateDirectory() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
+        File baseDir = Files.createTempDirectory(stateDir).toFile();
         try {
             FileLock lock;
 
             // the state manager locks the directory
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, new MockRestoreConsumer(), false);
 
             try {
                 // this should not get the lock
@@ -208,15 +216,15 @@ public class ProcessorStateManagerTest {
         }
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test(expected = KafkaException.class)
     public void testNoTopic() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
+        File baseDir = Files.createTempDirectory(stateDir).toFile();
         try {
-            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
+            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, new MockRestoreConsumer(), false);
             try {
-                stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
+                stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
             } finally {
                 stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
             }
@@ -227,41 +235,42 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testRegisterPersistentStore() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
+        File baseDir = Files.createTempDirectory(stateDir).toFile();
         try {
             long lastCheckpointedOffset = 10L;
+
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-            checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
+            checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("persistentStore", Utils.mkList(
-                    new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
-                    new PartitionInfo("persistentStore", 2, Node.noNode(), new Node[0], new Node[0])
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, baseDir, restoreConsumer, false);
+
+            restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
+                    new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]),
+                    new PartitionInfo(persistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])
             ));
 
-            TopicPartition partition = new TopicPartition("persistentStore", 2);
+            TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
             restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
 
-            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
-
-            ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false);
+            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); // persistent store
             try {
                 restoreConsumer.reset();
 
                 ArrayList<Integer> expectedKeys = new ArrayList<>();
-                long offset = -1L;
+                long offset;
                 for (int i = 1; i <= 3; i++) {
                     offset = (long) i;
                     int key = i * 10;
                     expectedKeys.add(key);
                     restoreConsumer.bufferRecord(
-                            new ConsumerRecord<>("persistentStore", 2, offset, key, 0)
+                            new ConsumerRecord<>(persistentStoreTopicName, 2, offset, key, 0)
                     );
                 }
 
-                stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
+                stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
 
-                assertEquals(new TopicPartition("persistentStore", 2), restoreConsumer.assignedPartition);
+                assertEquals(new TopicPartition(persistentStoreTopicName, 2), restoreConsumer.assignedPartition);
                 assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset);
                 assertFalse(restoreConsumer.seekToBeginingCalled);
                 assertTrue(restoreConsumer.seekToEndCalled);
@@ -278,24 +287,26 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testRegisterNonPersistentStore() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
+        File baseDir = Files.createTempDirectory(stateDir).toFile();
         try {
             long lastCheckpointedOffset = 10L;
-            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-            checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList(
-                    new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
-                    new PartitionInfo("nonPersistentStore", 2, Node.noNode(), new Node[0], new Node[0])
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, baseDir, restoreConsumer, false);
+
+            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+            checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
+
+            restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
+                    new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]),
+                    new PartitionInfo(nonPersistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])
             ));
 
-            TopicPartition partition = new TopicPartition("persistentStore", 2);
+            TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
             restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
 
-            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store
+            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false);
             try {
                 restoreConsumer.reset();
 
@@ -306,13 +317,13 @@ public class ProcessorStateManagerTest {
                     int key = i;
                     expectedKeys.add(i);
                     restoreConsumer.bufferRecord(
-                            new ConsumerRecord<>("nonPersistentStore", 2, offset, key, 0)
+                            new ConsumerRecord<>(nonPersistentStoreTopicName, 2, offset, key, 0)
                     );
                 }
 
-                stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
+                stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
 
-                assertEquals(new TopicPartition("nonPersistentStore", 2), restoreConsumer.assignedPartition);
+                assertEquals(new TopicPartition(nonPersistentStoreTopicName, 2), restoreConsumer.assignedPartition);
                 assertEquals(0L, restoreConsumer.seekOffset);
                 assertTrue(restoreConsumer.seekToBeginingCalled);
                 assertTrue(restoreConsumer.seekToEndCalled);
@@ -328,37 +339,44 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testChangeLogOffsets() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
+        File baseDir = Files.createTempDirectory(stateDir).toFile();
         try {
             long lastCheckpointedOffset = 10L;
+            String storeName1 = "store1";
+            String storeName2 = "store2";
+
+            String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1);
+            String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2);
+
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-            checkpoint.write(Collections.singletonMap(new TopicPartition("store1", 0), lastCheckpointedOffset));
+            checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("store1", Utils.mkList(
-                    new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0])
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, baseDir, restoreConsumer, true); // standby
+
+            restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList(
+                    new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0])
             ));
-            restoreConsumer.updatePartitions("store2", Utils.mkList(
-                    new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0])
+            restoreConsumer.updatePartitions(storeTopicName2, Utils.mkList(
+                    new PartitionInfo(storeTopicName2, 0, Node.noNode(), new Node[0], new Node[0])
             ));
 
-            TopicPartition partition1 = new TopicPartition("store1", 0);
-            TopicPartition partition2 = new TopicPartition("store2", 0);
+            TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
+            TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
 
             Map<TopicPartition, Long> endOffsets = new HashMap<>();
             endOffsets.put(partition1, 13L);
             endOffsets.put(partition2, 17L);
             restoreConsumer.updateEndOffsets(endOffsets);
 
-            MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore("store1", true);
-            MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore("store2", true);
+            MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
+            MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(0, baseDir, restoreConsumer, true); // standby
             try {
                 restoreConsumer.reset();
 
-                stateMgr.register(store1, store1.stateRestoreCallback);
-                stateMgr.register(store2, store2.stateRestoreCallback);
+                stateMgr.register(store1, true, store1.stateRestoreCallback);
+                stateMgr.register(store2, true, store2.stateRestoreCallback);
 
                 Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets();
 
@@ -379,21 +397,22 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testGetStore() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
+        File baseDir = Files.createTempDirectory(stateDir).toFile();
         try {
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("mockStore", Utils.mkList(
-                    new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0])
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, restoreConsumer, false);
+
+            restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
+                    new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
             ));
 
-            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
+            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false);
             try {
-                stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
+                stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
 
                 assertNull(stateMgr.getStore("noSuchStore"));
-                assertEquals(mockStateStore, stateMgr.getStore("mockStore"));
+                assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
 
             } finally {
                 stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@@ -405,7 +424,7 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testClose() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
+        File baseDir = Files.createTempDirectory(stateDir).toFile();
         File checkpointFile = new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME);
         try {
             // write an empty checkpoint file
@@ -413,32 +432,33 @@ public class ProcessorStateManagerTest {
             oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("persistentStore", Utils.mkList(
-                    new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0])
+            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, baseDir, restoreConsumer, false);
+
+            restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
+                    new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
             ));
-            restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList(
-                    new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0])
+            restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
+                    new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
             ));
 
             // set up ack'ed offsets
             HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
-            ackedOffsets.put(new TopicPartition("persistentStore", 1), 123L);
-            ackedOffsets.put(new TopicPartition("nonPersistentStore", 1), 456L);
-            ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L);
+            ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L);
+            ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
+            ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(jobId, "otherTopic"), 1), 789L);
 
-            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
-            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
+            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
+            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false);
             try {
                 // make sure the checkpoint file is deleted
                 assertFalse(checkpointFile.exists());
 
                 restoreConsumer.reset();
-                stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
+                stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
 
                 restoreConsumer.reset();
-                stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
+                stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
             } finally {
                 // close the state manager with the ack'ed offsets
                 stateMgr.close(ackedOffsets);
@@ -455,7 +475,7 @@ public class ProcessorStateManagerTest {
             OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
             Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read();
             assertEquals(1, checkpointedOffsets.size());
-            assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition("persistentStore", 1)));
+            assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1)));
         } finally {
             Utils.delete(baseDir);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 9a43e46..00b983d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -53,16 +53,22 @@ public class StandbyTaskTest {
 
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
 
-    private final TopicPartition partition1 = new TopicPartition("store1", 1);
-    private final TopicPartition partition2 = new TopicPartition("store2", 1);
+    private final String jobId = "test-job";
+    private final String storeName1 = "store1";
+    private final String storeName2 = "store2";
+    private final String storeChangelogTopicName1 = jobId + "-" + storeName1 + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
+    private final String storeChangelogTopicName2 = jobId + "-" + storeName2 + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
+
+    private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1, 1);
+    private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2, 1);
 
     private final Set<TopicPartition> topicPartitions = Collections.emptySet();
     private final ProcessorTopology topology = new ProcessorTopology(
             Collections.<ProcessorNode>emptyList(),
             Collections.<String, SourceNode>emptyMap(),
             Utils.<StateStoreSupplier>mkList(
-                    new MockStateStoreSupplier(partition1.topic(), false),
-                    new MockStateStoreSupplier(partition2.topic(), true)
+                    new MockStateStoreSupplier(storeName1, false),
+                    new MockStateStoreSupplier(storeName2, true)
             )
     );
 
@@ -91,25 +97,31 @@ public class StandbyTaskTest {
     @Before
     public void setup() {
         restoreStateConsumer.reset();
-        restoreStateConsumer.updatePartitions("store1", Utils.mkList(
-                new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo("store1", 1, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo("store1", 2, Node.noNode(), new Node[0], new Node[0])
+        restoreStateConsumer.updatePartitions(storeChangelogTopicName1, Utils.mkList(
+                new PartitionInfo(storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo(storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo(storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])
         ));
 
-        restoreStateConsumer.updatePartitions("store2", Utils.mkList(
-                new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo("store2", 1, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo("store2", 2, Node.noNode(), new Node[0], new Node[0])
+        System.out.println("added " + storeChangelogTopicName1);
+
+        restoreStateConsumer.updatePartitions(storeChangelogTopicName2, Utils.mkList(
+                new PartitionInfo(storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo(storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])
         ));
+
+        System.out.println("added " + storeChangelogTopicName2);
     }
 
     @Test
     public void testStorePartitions() throws Exception {
+        System.out.println("STARTED");
+
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
 
@@ -124,7 +136,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
@@ -143,7 +155,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
@@ -168,9 +180,9 @@ public class StandbyTaskTest {
 
             StandbyContextImpl context = (StandbyContextImpl) task.context();
             MockStateStoreSupplier.MockStateStore store1 =
-                    (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition1.topic());
+                    (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName1);
             MockStateStoreSupplier.MockStateStore store2 =
-                    (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition2.topic());
+                    (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName2);
 
             assertEquals(Collections.emptyList(), store1.keys);
             assertEquals(Utils.mkList(1, 2, 3), store2.keys);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index aae5a7d..1847e85 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -103,7 +103,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(0, 0), partitions, topology, consumer, producer, restoreStateConsumer, config, null);
+            StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
@@ -154,7 +154,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(1, 1), partitions, topology, consumer, producer, restoreStateConsumer, config, null);
+            StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 9f31450..5f0347d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -59,7 +59,8 @@ import java.util.UUID;
 
 public class StreamThreadTest {
 
-    private UUID uuid = UUID.randomUUID();
+    private String clientId = "clientId";
+    private UUID processId = UUID.randomUUID();
 
     private TopicPartition t1p1 = new TopicPartition("topic1", 1);
     private TopicPartition t1p2 = new TopicPartition("topic1", 2);
@@ -90,7 +91,7 @@ public class StreamThreadTest {
         ByteBuffer buf = ByteBuffer.allocate(4 + 16 + 4 + 4);
         // version
         buf.putInt(1);
-        // encode client clientUUID
+        // encode client processId
         buf.putLong(uuid.getMostSignificantBits());
         buf.putLong(uuid.getLeastSignificantBits());
         // previously running tasks
@@ -132,7 +133,7 @@ public class StreamThreadTest {
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
                               StreamingConfig config) {
-            super(id, partitions, topology, consumer, producer, restoreConsumer, config, null);
+            super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null);
         }
 
         @Override
@@ -159,7 +160,7 @@ public class StreamThreadTest {
         builder.addSource("source3", "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                 ProcessorTopology topology = builder.build(id.topicGroupId);
@@ -279,7 +280,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeClean() {
                     super.maybeClean();
@@ -401,7 +402,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeCommit() {
                     super.maybeCommit();
@@ -471,7 +472,7 @@ public class StreamThreadTest {
 
         KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
 
-        partitionAssignor.configure(config.getConsumerConfigs(thread));
+        partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId));
 
         Map<String, PartitionAssignor.Assignment> assignments =
                 partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index acc9a9d..3119bee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -31,13 +31,14 @@ public class SubscriptionInfoTest {
 
     @Test
     public void testEncodeDecode() {
-        UUID clientUUID = UUID.randomUUID();
+        UUID processId = UUID.randomUUID();
+
         Set<TaskId> activeTasks =
                 new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)));
         Set<TaskId> standbyTasks =
                 new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
 
-        SubscriptionInfo info = new SubscriptionInfo(clientUUID, activeTasks, standbyTasks);
+        SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks);
         SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
 
         assertEquals(info, decoded);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 28cc3af..9e24741 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -269,7 +269,7 @@ public class KeyValueStoreTestDriver<K, V> {
             }
 
             @Override
-            public void register(StateStore store, StateRestoreCallback func) {
+            public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) {
                 storeMap.put(store.name(), store);
                 restoreEntries(func);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 81a9add..5d42a63 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -127,7 +127,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public void register(StateStore store, StateRestoreCallback func) {
+    public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) {
         storeMap.put(store.name(), store);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index 16635b7..3b17afe 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -66,7 +66,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
 
         @Override
         public void init(ProcessorContext context) {
-            context.register(this, stateRestoreCallback);
+            context.register(this, true, stateRestoreCallback);
             initialized = true;
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index fdb4d57..879c172 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -124,6 +125,8 @@ public class ProcessorTopologyTestDriver {
 
     private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
 
+    private final String jobId = "test-driver-job";
+
     private final TaskId id;
     private final ProcessorTopology topology;
     private final StreamTask task;
@@ -158,6 +161,7 @@ public class ProcessorTopologyTestDriver {
         }
 
         task = new StreamTask(id,
+            jobId,
             partitionsByTopic.values(),
             topology,
             consumer,
@@ -324,12 +328,12 @@ public class ProcessorTopologyTestDriver {
         };
         // For each store name ...
         for (String storeName : storeNames) {
-            String topicName = storeName;
+            String topicName = jobId + "-" + storeName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
             // Set up the restore-state topic ...
             // consumer.subscribe(new TopicPartition(topicName, 1));
             // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
             List<PartitionInfo> partitionInfos = new ArrayList<>();
-            partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
+            partitionInfos.add(new PartitionInfo(topicName , id.partition, null, null, null));
             consumer.updatePartitions(topicName, partitionInfos);
             consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
index 04c8f61..3da1ca7 100644
--- a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
+++ b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
@@ -49,7 +49,7 @@ public class UnlimitedWindowDef<K, V> implements WindowSupplier<K, V> {
 
         @Override
         public void init(ProcessorContext context) {
-            context.register(this, null);
+            context.register(this, true, null);
         }
 
         @Override


Mime
View raw message