kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3858: Add functions to print stream topologies
Date Thu, 21 Jul 2016 20:11:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 999108667 -> 933a7506e


KAFKA-3858: Add functions to print stream topologies

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Roger Hoover, Matthias J. Sax, Guozhang Wang

Closes #1619 from enothereska/KAFKA-3858-print-topology


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/933a7506
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/933a7506
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/933a7506

Branch: refs/heads/trunk
Commit: 933a7506efb57add31a1e9dbd984c1230128b855
Parents: 9991086
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Thu Jul 21 13:11:53 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jul 21 13:11:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 16 +++++
 .../streams/processor/TopologyBuilder.java      | 15 ++++-
 .../processor/internals/AbstractTask.java       | 27 +++++++++
 .../processor/internals/ProcessorNode.java      | 17 ++++++
 .../processor/internals/ProcessorTopology.java  | 62 +++++++++++++++++++-
 .../streams/processor/internals/SinkNode.java   |  9 +++
 .../streams/processor/internals/SourceNode.java | 19 +++++-
 .../processor/internals/StandbyTask.java        |  9 +++
 .../streams/processor/internals/StreamTask.java |  8 +++
 .../processor/internals/StreamThread.java       | 33 +++++++++++
 .../processor/internals/PartitionGroupTest.java |  9 +--
 .../processor/internals/RecordQueueTest.java    |  3 +-
 .../processor/internals/StandbyTaskTest.java    |  2 +
 .../processor/internals/StreamTaskTest.java     | 16 ++---
 .../org/apache/kafka/test/MockSourceNode.java   |  4 +-
 15 files changed, 228 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 0ed0b6c..8f8cfa7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -219,6 +219,22 @@ public class KafkaStreams {
     }
 
     /**
+     * Produces a string representation contain useful information about Kafka Streams
+     * Such as thread IDs, task IDs and a representation of the topology. This is useful
+     * in debugging scenarios.
+     * @return A string representation of the Kafka Streams instance.
+     */
+    public String toString() {
+        StringBuilder sb = new StringBuilder("KafkaStreams processID:" + this.processId +
"\n");
+        for (int i = 0; i < this.threads.length; i++) {
+            sb.append("\t" + this.threads[i].toString());
+        }
+        sb.append("\n");
+
+        return sb.toString();
+    }
+
+    /**
      * Sets the handler invoked when a stream thread abruptly terminates due to an uncaught
exception.
      *
      * @param eh the object to use as this thread's uncaught exception handler. If null then
this thread has no explicit handler.

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 2c02b0c..a28b270 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -157,7 +157,7 @@ public class TopologyBuilder {
         @SuppressWarnings("unchecked")
         @Override
         public ProcessorNode build(String applicationId) {
-            return new SourceNode(name, keyDeserializer, valDeserializer);
+            return new SourceNode(name, nodeToSourceTopics.get(name), keyDeserializer, valDeserializer);
         }
 
         private boolean isMatch(String topic) {
@@ -771,6 +771,7 @@ public class TopologyBuilder {
         return topicNames;
     }
 
+
     /**
      * Build the topology for the specified topic group. This is called automatically when
passing this builder into the
      * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)}
constructor.
@@ -793,6 +794,7 @@ public class TopologyBuilder {
         List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
         Map<String, ProcessorNode> processorMap = new HashMap<>();
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
+        Map<String, SinkNode> topicSinkMap = new HashMap<>();
         Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>();
 
         // create processor nodes in a topological order ("nodeFactories" is already topologically
sorted)
@@ -823,8 +825,15 @@ public class TopologyBuilder {
                         }
                     }
                 } else if (factory instanceof SinkNodeFactory) {
-                    for (String parent : ((SinkNodeFactory) factory).parents) {
+                    SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory;
+                    for (String parent : sinkNodeFactory.parents) {
                         processorMap.get(parent).addChild(node);
+                        if (internalTopicNames.contains(sinkNodeFactory.topic)) {
+                            // prefix the internal topic name with the application id
+                            topicSinkMap.put(applicationId + "-" + sinkNodeFactory.topic,
(SinkNode) node);
+                        } else {
+                            topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node);
+                        }
                     }
                 } else {
                     throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
@@ -832,7 +841,7 @@ public class TopologyBuilder {
             }
         }
 
-        return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values()));
+        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 488fc0f..70070a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -124,4 +124,31 @@ public abstract class AbstractTask {
     public StateStore getStore(final String name) {
         return stateMgr.getStore(name);
     }
+
+    /**
+     * Produces a string representation contain useful information about a StreamTask.
+     * This is useful in debugging scenarios.
+     * @return A string representation of the StreamTask instance.
+     */
+    public String toString() {
+        StringBuilder sb = new StringBuilder("StreamsTask taskId:" + this.id() + "\n");
+
+        // print topology
+        if (topology != null) {
+            sb.append("\t\t\t" + topology.toString());
+        }
+
+        // print assigned partitions
+        if (partitions != null && !partitions.isEmpty()) {
+            sb.append("\t\t\tPartitions [");
+            for (TopicPartition topicPartition : partitions) {
+                sb.append(topicPartition.toString() + ",");
+            }
+            sb.setLength(sb.length() - 1);
+            sb.append("]");
+        }
+
+        sb.append("\n");
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 50e3a0b..64ca032 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -71,4 +71,21 @@ public class ProcessorNode<K, V> {
     public void close() {
         processor.close();
     }
+
+    /**
+     * @return a string representation of this node, useful for debugging.
+     */
+    public String toString() {
+        StringBuilder sb = new StringBuilder("");
+        sb.append(name + ": ");
+        if (stateStores != null && !stateStores.isEmpty()) {
+            sb.append("stateStores [");
+            for (String store : (Set<String>) stateStores) {
+                sb.append(store + ",");
+            }
+            sb.setLength(sb.length() - 1);
+            sb.append("] ");
+        }
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index a70aa70..0316446 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -16,27 +16,37 @@
  */
 
 package org.apache.kafka.streams.processor.internals;
-
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.Set;
-
 public class ProcessorTopology {
 
     private final List<ProcessorNode> processorNodes;
     private final Map<String, SourceNode> sourceByTopics;
+    private final Map<String, SinkNode> sinkByTopics;
     private final List<StateStoreSupplier> stateStoreSuppliers;
+    private final Map<String, String> sinkNameToTopic;
 
     public ProcessorTopology(List<ProcessorNode> processorNodes,
                              Map<String, SourceNode> sourceByTopics,
+                             Map<String, SinkNode> sinkByTopics,
                              List<StateStoreSupplier> stateStoreSuppliers) {
         this.processorNodes = Collections.unmodifiableList(processorNodes);
         this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
+        this.sinkByTopics   = Collections.unmodifiableMap(sinkByTopics);
         this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers);
+
+        // pre-process sink nodes to get reverse mapping
+        sinkNameToTopic = new HashMap<>();
+        for (String topic : sinkByTopics.keySet()) {
+            SinkNode sink = sinkByTopics.get(topic);
+            sinkNameToTopic.put(sink.name(), topic);
+        }
     }
 
     public Set<String> sourceTopics() {
@@ -51,6 +61,18 @@ public class ProcessorTopology {
         return new HashSet<>(sourceByTopics.values());
     }
 
+    public Set<String> sinkTopics() {
+        return sinkByTopics.keySet();
+    }
+
+    public SinkNode sink(String topic) {
+        return sinkByTopics.get(topic);
+    }
+
+    public Set<SinkNode> sinks() {
+        return new HashSet<>(sinkByTopics.values());
+    }
+
     public List<ProcessorNode> processors() {
         return processorNodes;
     }
@@ -59,4 +81,40 @@ public class ProcessorTopology {
         return stateStoreSuppliers;
     }
 
+    private String childrenToString(List<ProcessorNode<?, ?>> children) {
+        if (children == null || children.isEmpty()) {
+            return "";
+        }
+
+        StringBuilder sb = new StringBuilder("children [");
+        for (ProcessorNode child : children) {
+            sb.append(child.name() + ",");
+        }
+        sb.setLength(sb.length() - 1);
+        sb.append("]\n");
+
+        // recursively print children
+        for (ProcessorNode child : children) {
+            sb.append("\t\t\t\t" + child.toString());
+            sb.append(childrenToString(child.children()));
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Produces a string representation contain useful information this topology.
+     * This is useful in debugging scenarios.
+     * @return A string representation of this instance.
+     */
+    public String toString() {
+        StringBuilder sb = new StringBuilder("ProcessorTopology:\n");
+
+        // start from sources
+        for (SourceNode source : sourceByTopics.values()) {
+            sb.append("\t\t\t\t" + source.toString());
+            sb.append(childrenToString(source.children()));
+            sb.append("\n");
+        }
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 3795916..6907858 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -81,4 +81,13 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     public Serializer<V> valueSerializer() {
         return valSerializer;
     }
+
+    /**
+     * @return a string representation of this node, useful for debugging.
+     */
+    public String toString() {
+        StringBuilder sb = new StringBuilder(super.toString());
+        sb.append("topic:" + topic);
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index a550344..90da1de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -26,10 +26,11 @@ public class SourceNode<K, V> extends ProcessorNode<K, V>
{
     private Deserializer<K> keyDeserializer;
     private Deserializer<V> valDeserializer;
     private ProcessorContext context;
+    private String[] topics;
 
-    public SourceNode(String name, Deserializer<K> keyDeserializer, Deserializer<V>
valDeserializer) {
+    public SourceNode(String name, String[] topics, Deserializer<K> keyDeserializer,
Deserializer<V> valDeserializer) {
         super(name);
-
+        this.topics = topics;
         this.keyDeserializer = keyDeserializer;
         this.valDeserializer = valDeserializer;
     }
@@ -73,4 +74,18 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
     public Deserializer<V> valueDeserializer() {
         return valDeserializer;
     }
+
+    /**
+     * @return a string representation of this node, useful for debugging.
+     */
+    public String toString() {
+        StringBuilder sb = new StringBuilder(super.toString());
+        sb.append("topics: [");
+        for (String topic : topics) {
+            sb.append(topic + ",");
+        }
+        sb.setLength(sb.length() - 1);
+        sb.append("] ");
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 830fab6..08b4f07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -90,4 +90,13 @@ public class StandbyTask extends AbstractTask {
         // reinitialize offset limits
         initializeOffsetLimits();
     }
+
+    /**
+     * Produces a string representation contain useful information about a StreamTask.
+     * This is useful in debugging scenarios.
+     * @return A string representation of the StreamTask instance.
+     */
+    public String toString() {
+        return super.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 402c8fd..3126dd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -376,4 +376,12 @@ public class StreamTask extends AbstractTask implements Punctuator {
         }
     }
 
+    /**
+     * Produces a string representation contain useful information about a StreamTask.
+     * This is useful in debugging scenarios.
+     * @return A string representation of the StreamTask instance.
+     */
+    public String toString() {
+        return super.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f982efa..c84cae0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -621,6 +621,39 @@ public class StreamThread extends Thread {
     }
 
 
+    /**
+     * Produces a string representation contain useful information about a StreamThread.
+     * This is useful in debugging scenarios.
+     * @return A string representation of the StreamThread instance.
+     */
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder("StreamsThread appId:" + this.applicationId
+ "\n");
+        sb.append("\tStreamsThread clientId:" + clientId + "\n");
+
+        // iterate and print active tasks
+        if (activeTasks != null) {
+            sb.append("\tActive tasks:\n");
+            for (TaskId tId : activeTasks.keySet()) {
+                StreamTask task = activeTasks.get(tId);
+                sb.append("\t\t" + task.toString());
+            }
+        }
+
+        // iterate and print standby tasks
+        if (standbyTasks != null) {
+            sb.append("\tStandby tasks:\n");
+            for (TaskId tId : standbyTasks.keySet()) {
+                StandbyTask task = standbyTasks.get(tId);
+                sb.append("\t\t" + task.toString());
+            }
+            sb.append("\n");
+        }
+
+        return sb.toString();
+    }
+
+
     private void removeStandbyTasks() {
         try {
             for (StandbyTask task : standbyTasks.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index a1c07af..f8c080c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -38,10 +38,11 @@ public class PartitionGroupTest {
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
-    private final TopicPartition partition1 = new TopicPartition("topic", 1);
-    private final TopicPartition partition2 = new TopicPartition("topic", 2);
-    private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(intDeserializer,
intDeserializer));
-    private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(intDeserializer,
intDeserializer));
+    private final String[] topics = {"topic"};
+    private final TopicPartition partition1 = new TopicPartition(topics[0], 1);
+    private final TopicPartition partition2 = new TopicPartition(topics[0], 2);
+    private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics,
intDeserializer, intDeserializer));
+    private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics,
intDeserializer, intDeserializer));
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 8d9c91c..7870611 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -39,7 +39,8 @@ public class RecordQueueTest {
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
-    private final RecordQueue queue = new RecordQueue(new TopicPartition("topic", 1), new
MockSourceNode<>(intDeserializer, intDeserializer));
+    private final String[] topics = {"topic"};
+    private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1), new
MockSourceNode<>(topics, intDeserializer, intDeserializer));
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 39cb0a0..f3339a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -73,6 +73,7 @@ public class StandbyTaskTest {
     private final ProcessorTopology topology = new ProcessorTopology(
             Collections.<ProcessorNode>emptyList(),
             Collections.<String, SourceNode>emptyMap(),
+            Collections.<String, SinkNode>emptyMap(),
             Utils.<StateStoreSupplier>mkList(
                     new MockStateStoreSupplier(storeName1, false),
                     new MockStateStoreSupplier(storeName2, true)
@@ -84,6 +85,7 @@ public class StandbyTaskTest {
     private final ProcessorTopology ktableTopology = new ProcessorTopology(
             Collections.<ProcessorNode>emptyList(),
             Collections.<String, SourceNode>emptyMap(),
+            Collections.<String, SinkNode>emptyMap(),
             Utils.<StateStoreSupplier>mkList(
                     new MockStateStoreSupplier(ktable.topic(), true, false)
             )

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index e3f00ff..fdcf6b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -56,23 +56,25 @@ public class StreamTaskTest {
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
     private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
-
-    private final TopicPartition partition1 = new TopicPartition("topic1", 1);
-    private final TopicPartition partition2 = new TopicPartition("topic2", 1);
+    private final String[] topic1 = {"topic1"};
+    private final String[] topic2 = {"topic2"};
+    private final TopicPartition partition1 = new TopicPartition(topic1[0], 1);
+    private final TopicPartition partition2 = new TopicPartition(topic2[0], 1);
     private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2);
 
-    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer,
intDeserializer);
-    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer,
intDeserializer);
+    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(topic1,
intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(topic2,
intDeserializer, intDeserializer);
     private final MockProcessorNode<Integer, Integer>  processor = new MockProcessorNode<>(10L);
 
     private final ProcessorTopology topology = new ProcessorTopology(
             Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2, (ProcessorNode)
processor),
             new HashMap<String, SourceNode>() {
                 {
-                    put("topic1", source1);
-                    put("topic2", source2);
+                    put(topic1[0], source1);
+                    put(topic2[0], source2);
                 }
             },
+            Collections.<String, SinkNode>emptyMap(),
             Collections.<StateStoreSupplier>emptyList()
     );
     private File baseDir;

http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index cf0202e..176501a 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -33,8 +33,8 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> {
     public final ArrayList<K> keys = new ArrayList<>();
     public final ArrayList<V> values = new ArrayList<>();
 
-    public MockSourceNode(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer)
{
-        super(NAME + INDEX.getAndIncrement(), keyDeserializer, valDeserializer);
+    public MockSourceNode(String[] topics, Deserializer<K> keyDeserializer, Deserializer<V>
valDeserializer) {
+        super(NAME + INDEX.getAndIncrement(), topics, keyDeserializer, valDeserializer);
     }
 
     @Override


Mime
View raw message