kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3395: prefix job id to internal topic names
Date Mon, 14 Mar 2016 21:50:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ac7b2e95d -> c1a56c683


KAFKA-3395: prefix job id to internal topic names

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1062 from ymatsuda/k3395


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1a56c68
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1a56c68
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1a56c68

Branch: refs/heads/trunk
Commit: c1a56c6839e77f3de5266315a92b236a379ec857
Parents: ac7b2e9
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Mon Mar 14 14:50:24 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Mar 14 14:50:24 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilder.java      | 76 +++++++++++++-------
 .../internals/StreamPartitionAssignor.java      |  6 +-
 .../processor/internals/StreamThread.java       |  6 +-
 .../kstream/internals/KStreamImplTest.java      |  2 +-
 .../streams/processor/TopologyBuilderTest.java  | 28 +++++---
 .../internals/ProcessorTopologyTest.java        |  2 +-
 .../internals/StreamPartitionAssignorTest.java  |  6 +-
 .../processor/internals/StreamThreadTest.java   |  6 +-
 .../apache/kafka/test/KStreamTestDriver.java    |  4 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |  2 +-
 tests/kafkatest/tests/streams_bounce_test.py    |  1 -
 tests/kafkatest/tests/streams_smoke_test.py     |  1 -
 12 files changed, 84 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 7e3cab9..6e5aec5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -85,7 +85,7 @@ public class TopologyBuilder {
             this.name = name;
         }
 
-        public abstract ProcessorNode build();
+        public abstract ProcessorNode build(String jobId);
     }
 
     private static class ProcessorNodeFactory extends NodeFactory {
@@ -105,7 +105,7 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build() {
+        public ProcessorNode build(String jobId) {
             return new ProcessorNode(name, supplier.get(), stateStoreNames);
         }
     }
@@ -124,12 +124,12 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build() {
+        public ProcessorNode build(String jobId) {
             return new SourceNode(name, keyDeserializer, valDeserializer);
         }
     }
 
-    private static class SinkNodeFactory extends NodeFactory {
+    private class SinkNodeFactory extends NodeFactory {
         public final String[] parents;
         public final String topic;
         private Serializer keySerializer;
@@ -147,8 +147,13 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build() {
-            return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
+        public ProcessorNode build(String jobId) {
+            if (internalTopicNames.contains(topic)) {
+                // prefix the job id to the internal topic name
+                return new SinkNode(name, jobId + "-" + topic, keySerializer, valSerializer,
partitioner);
+            } else {
+                return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
+            }
         }
     }
 
@@ -491,7 +496,7 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public Map<Integer, TopicsInfo> topicGroups() {
+    public Map<Integer, TopicsInfo> topicGroups(String jobId) {
         Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
 
         if (nodeGroups == null)
@@ -506,27 +511,35 @@ public class TopologyBuilder {
                 // if the node is a source node, add to the source topics
                 String[] topics = nodeToSourceTopics.get(node);
                 if (topics != null) {
-                    sourceTopics.addAll(Arrays.asList(topics));
-
                     // if some of the topics are internal, add them to the internal topics
                     for (String topic : topics) {
-                        if (this.internalTopicNames.contains(topic))
-                            internalSourceTopics.add(topic);
+                        if (this.internalTopicNames.contains(topic)) {
+                            // prefix the job id to the internal topic name
+                            String internalTopic = jobId + "-" + topic;
+                            internalSourceTopics.add(internalTopic);
+                            sourceTopics.add(internalTopic);
+                        } else {
+                            sourceTopics.add(topic);
+                        }
                     }
                 }
 
                 // if the node is a sink node, add to the sink topics
                 String topic = nodeToSinkTopic.get(node);
-                if (topic != null)
-                    sinkTopics.add(topic);
+                if (topic != null) {
+                    if (internalTopicNames.contains(topic)) {
+                        // prefix the job id to the change log topic name
+                        sinkTopics.add(jobId + "-" + topic);
+                    } else {
+                        sinkTopics.add(topic);
+                    }
+                }
 
                 // if the node is connected to a state, add to the state topics
                 for (StateStoreFactory stateFactory : stateFactories.values()) {
-
-                    // we store the changelog topic here without the job id prefix
-                    // since it is within a single job and is only used for
                     if (stateFactory.isInternal && stateFactory.users.contains(node))
{
-                        stateChangelogTopics.add(stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+                        // prefix the job id to the change log topic name
+                        stateChangelogTopics.add(jobId + "-" + stateFactory.supplier.name()
+ ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
                     }
                 }
             }
@@ -586,7 +599,7 @@ public class TopologyBuilder {
 
         return nodeGroups;
     }
-    
+
     /**
      * Asserts that the streams of the specified source nodes must be copartitioned.
      *
@@ -624,7 +637,7 @@ public class TopologyBuilder {
      *
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
      */
-    public ProcessorTopology build(Integer topicGroupId) {
+    public ProcessorTopology build(String jobId, Integer topicGroupId) {
         Set<String> nodeGroup;
         if (topicGroupId != null) {
             nodeGroup = nodeGroups().get(topicGroupId);
@@ -632,11 +645,11 @@ public class TopologyBuilder {
             // when nodeGroup is null, we build the full topology. this is used in some tests.
             nodeGroup = null;
         }
-        return build(nodeGroup);
+        return build(jobId, nodeGroup);
     }
 
     @SuppressWarnings("unchecked")
-    private ProcessorTopology build(Set<String> nodeGroup) {
+    private ProcessorTopology build(String jobId, Set<String> nodeGroup) {
         List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
         Map<String, ProcessorNode> processorMap = new HashMap<>();
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
@@ -645,7 +658,7 @@ public class TopologyBuilder {
         // create processor nodes in a topological order ("nodeFactories" is already topologically
sorted)
         for (NodeFactory factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                ProcessorNode node = factory.build();
+                ProcessorNode node = factory.build(jobId);
                 processorNodes.add(node);
                 processorMap.put(node.name(), node);
 
@@ -660,7 +673,12 @@ public class TopologyBuilder {
                     }
                 } else if (factory instanceof SourceNodeFactory) {
                     for (String topic : ((SourceNodeFactory) factory).topics) {
-                        topicSourceMap.put(topic, (SourceNode) node);
+                        if (internalTopicNames.contains(topic)) {
+                            // prefix the job id to the internal topic name
+                            topicSourceMap.put(jobId + "-" + topic, (SourceNode) node);
+                        } else {
+                            topicSourceMap.put(topic, (SourceNode) node);
+                        }
                     }
                 } else if (factory instanceof SinkNodeFactory) {
                     for (String parent : ((SinkNodeFactory) factory).parents) {
@@ -679,7 +697,15 @@ public class TopologyBuilder {
      * Get the names of topics that are to be consumed by the source nodes created by this
builder.
      * @return the unmodifiable set of topic names used by source nodes, which changes as
new sources are added; never null
      */
-    public Set<String> sourceTopics() {
-        return Collections.unmodifiableSet(sourceTopicNames);
+    public Set<String> sourceTopics(String jobId) {
+        Set<String> topics = new HashSet<>();
+        for (String topic : sourceTopicNames) {
+            if (internalTopicNames.contains(topic)) {
+                topics.add(jobId + "-" + topic);
+            } else {
+                topics.add(topic);
+            }
+        }
+        return Collections.unmodifiableSet(topics);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 13f269b..266df3e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -117,7 +117,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         streamThread = (StreamThread) o;
         streamThread.partitionAssignor(this);
 
-        this.topicGroups = streamThread.builder.topicGroups();
+        this.topicGroups = streamThread.builder.topicGroups(streamThread.jobId);
 
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
             internalTopicManager = new InternalTopicManager(
@@ -350,7 +350,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             topicToTaskIds.putAll(internalSourceTopicToTaskIds);
 
             for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet())
{
-                String topic = streamThread.jobId + "-" + entry.getKey();
+                String topic = entry.getKey();
 
                 // the expected number of partitions is the max value of TaskId.partition
+ 1
                 int numPartitions = 0;
@@ -445,7 +445,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     /* For Test Only */
     public Set<TaskId> tasksForState(String stateName) {
-        return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+        return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.jobId,
stateName));
     }
 
     public Set<TaskId> tasksForPartition(TopicPartition partition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 4ce86ac..e9343e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -173,7 +173,7 @@ public class StreamThread extends Thread {
         this.jobId = jobId;
         this.config = config;
         this.builder = builder;
-        this.sourceTopics = builder.sourceTopics();
+        this.sourceTopics = builder.sourceTopics(jobId);
         this.clientId = clientId;
         this.processId = processId;
         this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG,
PartitionGrouper.class);
@@ -580,7 +580,7 @@ public class StreamThread extends Thread {
     protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions)
{
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(id.topicGroupId);
+        ProcessorTopology topology = builder.build(jobId, id.topicGroupId);
 
         return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer,
config, sensors);
     }
@@ -650,7 +650,7 @@ public class StreamThread extends Thread {
     protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions)
{
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(id.topicGroupId);
+        ProcessorTopology topology = builder.build(jobId, id.topicGroupId);
 
         if (!topology.stateStoreSuppliers().isEmpty()) {
             return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer,
config, sensors);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 1ce56ff..3d3a9e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -137,6 +137,6 @@ public class KStreamImplTest {
             1 + // to
             2 + // through
             1, // process
-            builder.build(null).processors().size());
+            builder.build("X", null).processors().size());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 0635bd2..9af313a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -142,8 +142,14 @@ public class TopologyBuilderTest {
         builder.addSource("source-1", "topic-1");
         builder.addSource("source-2", "topic-2");
         builder.addSource("source-3", "topic-3");
+        builder.addInternalTopic("topic-3");
 
-        assertEquals(3, builder.sourceTopics().size());
+        Set<String> expected = new HashSet<String>();
+        expected.add("topic-1");
+        expected.add("topic-2");
+        expected.add("X-topic-3");
+
+        assertEquals(expected, builder.sourceTopics("X"));
     }
 
     @Test(expected = TopologyBuilderException.class)
@@ -184,13 +190,13 @@ public class TopologyBuilderTest {
 
         StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
         builder.addStateStore(supplier);
-        suppliers = builder.build(null).stateStoreSuppliers();
+        suppliers = builder.build("X", null).stateStoreSuppliers();
         assertEquals(0, suppliers.size());
 
         builder.addSource("source-1", "topic-1");
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
         builder.connectProcessorAndStateStores("processor-1", "store-1");
-        suppliers = builder.build(null).stateStoreSuppliers();
+        suppliers = builder.build("X", null).stateStoreSuppliers();
         assertEquals(1, suppliers.size());
         assertEquals(supplier.name(), suppliers.get(0).name());
     }
@@ -212,7 +218,7 @@ public class TopologyBuilderTest {
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
 
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
         expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1",
"topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
@@ -250,12 +256,12 @@ public class TopologyBuilderTest {
         builder.addStateStore(supplier);
         builder.connectProcessorAndStateStores("processor-5", "store-3");
 
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1",
"topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
-        expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3",
"topic-4"), Collections.<String>emptySet(), mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
-        expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"),
Collections.<String>emptySet(), mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
+        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1",
"topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X",
"store-1"))));
+        expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3",
"topic-4"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X",
"store-2"))));
+        expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"),
Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X",
"store-3"))));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -275,9 +281,9 @@ public class TopologyBuilderTest {
         builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
 
-        ProcessorTopology topology0 = builder.build(0);
-        ProcessorTopology topology1 = builder.build(1);
-        ProcessorTopology topology2 = builder.build(2);
+        ProcessorTopology topology0 = builder.build("X", 0);
+        ProcessorTopology topology1 = builder.build("X", 1);
+        ProcessorTopology topology2 = builder.build("X", 2);
 
         assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
         assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 40cce93..c8115b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -97,7 +97,7 @@ public class ProcessorTopologyTest {
         builder.addSink("sink-1", "topic-3", "processor-1");
         builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
 
-        final ProcessorTopology topology = builder.build(null);
+        final ProcessorTopology topology = builder.build("X", null);
 
         assertEquals(6, topology.processors().size());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 9ff0af0..7f37bda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -521,7 +521,7 @@ public class StreamPartitionAssignorTest {
         builder.addSink("sink1", "topicX", "processor1");
         builder.addSource("source2", "topicX");
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
-        List<String> topics = Utils.mkList("topic1", "topicX");
+        List<String> topics = Utils.mkList("topic1", "test-topicX");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         UUID uuid1 = UUID.randomUUID();
@@ -543,9 +543,7 @@ public class StreamPartitionAssignorTest {
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata,
subscriptions);
 
         // check prepared internal topics
-        // TODO: we need to change it to 1 after fixing the prefix
-        assertEquals(2, internalTopicManager.readyTopics.size());
-        assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("topicX"));
+        assertEquals(1, internalTopicManager.readyTopics.size());
         assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e072747..eaaf842 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -171,7 +171,7 @@ public class StreamThreadTest {
         StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
jobId, clientId, processId, new Metrics(), new SystemTime()) {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
-                ProcessorTopology topology = builder.build(id.topicGroupId);
+                ProcessorTopology topology = builder.build("X", id.topicGroupId);
                 return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer,
producer, mockRestoreConsumer, config);
             }
         };
@@ -298,7 +298,7 @@ public class StreamThreadTest {
 
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
-                    ProcessorTopology topology = builder.build(id.topicGroupId);
+                    ProcessorTopology topology = builder.build("X", id.topicGroupId);
                     return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer,
producer, mockRestoreConsumer, config);
                 }
             };
@@ -420,7 +420,7 @@ public class StreamThreadTest {
 
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
-                    ProcessorTopology topology = builder.build(id.topicGroupId);
+                    ProcessorTopology topology = builder.build("X", id.topicGroupId);
                     return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer,
producer, mockRestoreConsumer, config);
                 }
             };

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index c0c5c39..edbcb4a 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -53,7 +53,7 @@ public class KStreamTestDriver {
                              File stateDir,
                              Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
                              Serializer<?> valSerializer, Deserializer<?> valDeserializer)
{
-        this.topology = builder.build(null);
+        this.topology = builder.build("X", null);
         this.stateDir = stateDir;
         this.context = new MockProcessorContext(this, stateDir, keySerializer, keyDeserializer,
valSerializer, valDeserializer, new MockRecordCollector());
 
@@ -127,7 +127,7 @@ public class KStreamTestDriver {
         public MockRecordCollector() {
             super(null);
         }
-        
+
         @Override
         public <K, V> void send(ProducerRecord<K, V> record, Serializer<K>
keySerializer, Serializer<V> valueSerializer,
                                 StreamPartitioner<K, V> partitioner) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 34fd10c..cf17dbe 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -146,7 +146,7 @@ public class ProcessorTopologyTestDriver {
      */
     public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String...
storeNames) {
         id = new TaskId(0, 0);
-        topology = builder.build(null);
+        topology = builder.build("X", null);
 
         // Set up the consumer and producer ...
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/tests/kafkatest/tests/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_bounce_test.py b/tests/kafkatest/tests/streams_bounce_test.py
index 5523909..d674641 100644
--- a/tests/kafkatest/tests/streams_bounce_test.py
+++ b/tests/kafkatest/tests/streams_bounce_test.py
@@ -41,7 +41,6 @@ class StreamsBounceTest(KafkaTest):
         self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
         self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
 
-    @ignore
     def test_bounce(self):
         """
         Start a smoke test client, then abort (kill -9) and restart it a few times.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1a56c68/tests/kafkatest/tests/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_smoke_test.py b/tests/kafkatest/tests/streams_smoke_test.py
index ea05c5f..e3c465a 100644
--- a/tests/kafkatest/tests/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams_smoke_test.py
@@ -44,7 +44,6 @@ class StreamsSmokeTest(KafkaTest):
         self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
         self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
 
-    @ignore
     def test_streams(self):
         """
         Start a few smoke test clients, then repeat start a new one, stop (cleanly) running
one a few times.


Mime
View raw message