kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Maybe decorate inner topics for SourceNode
Date Tue, 10 Jan 2017 04:17:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 64514ff5e -> 109aa3536


MINOR: Maybe decorate inner topics for SourceNode

When creating the source node in TopologyBuilder, we need to decorate its input topics if
they are inner (i.e. repartition) topics with the prefix.

Also did some minor cleanup in the printing function for better visualization in debugging.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Eno Thereska, Damian Guy, Eno Thereska, Jun Rao

Closes #2320 from guozhangwang/KMinor-source-topic-fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/109aa353
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/109aa353
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/109aa353

Branch: refs/heads/trunk
Commit: 109aa353686cc0a9e4448c2b48da63f1d79acae6
Parents: 64514ff
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Mon Jan 9 20:17:16 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jan 9 20:17:16 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 17 ++++++++--
 .../streams/processor/TopologyBuilder.java      | 26 ++++++++++++---
 .../processor/internals/AbstractTask.java       | 26 ++++++++++-----
 .../processor/internals/ProcessorNode.java      | 22 ++++++++----
 .../processor/internals/ProcessorTopology.java  | 35 ++++++++++++--------
 .../streams/processor/internals/SinkNode.java   | 17 ++++++++--
 .../streams/processor/internals/SourceNode.java | 24 ++++++++++----
 .../processor/internals/StreamThread.java       | 25 +++++++++-----
 8 files changed, 138 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/109aa353/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 548e594..d2f388c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -384,15 +384,26 @@ public class KafkaStreams {
     }
 
     /**
-     * Produces a string representation contain useful information about Kafka Streams
+     * Produces a string representation containing useful information about Kafka Streams
      * Such as thread IDs, task IDs and a representation of the topology. This is useful
      * in debugging scenarios.
      * @return A string representation of the Kafka Streams instance.
      */
+    @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder("KafkaStreams processID:" + processId
+ "\n");
+        return toString("");
+    }
+
+    /**
+     * Produces a string representation containing useful information about Kafka Streams
+     * such as thread IDs, task IDs and a representation of the topology starting with the
given indent. This is useful
+     * in debugging scenarios.
+     * @return A string representation of the Kafka Streams instance.
+     */
+    public String toString(final String indent) {
+        final StringBuilder sb = new StringBuilder(indent + "KafkaStreams processID:" + processId
+ "\n");
         for (final StreamThread thread : threads) {
-            sb.append("\t").append(thread.toString());
+            sb.append(thread.toString(indent + "\t"));
         }
         sb.append("\n");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/109aa353/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ca6b7b7..b055d52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -162,11 +162,17 @@ public class TopologyBuilder {
             this.valDeserializer = valDeserializer;
         }
 
-        public String[] getTopics() {
+        String[] getTopics() {
             return topics;
         }
 
-        public String[] getTopics(Collection<String> subscribedTopics) {
+        String[] getTopics(Collection<String> subscribedTopics) {
+            // if it is subscribed via patterns, it is possible that the topic metadata has
not been updated
+            // yet and hence the map from source node to topics is stale, in this case we
put the pattern as a place holder;
+            // this should only happen for debugging since during runtime this function should
always be called after the metadata has updated.
+            if (subscribedTopics.isEmpty())
+                return new String[] {"Pattern[" + pattern + "]"};
+
             List<String> matchedTopics = new ArrayList<>();
             for (String update : subscribedTopics) {
                 if (this.pattern == topicToPatterns.get(update)) {
@@ -188,7 +194,15 @@ public class TopologyBuilder {
         @SuppressWarnings("unchecked")
         @Override
         public ProcessorNode build() {
-            return new SourceNode(name, nodeToSourceTopics.get(name), keyDeserializer, valDeserializer);
+            final String[] sourceTopics = nodeToSourceTopics.get(name);
+
+            // if it is subscribed via patterns, it is possible that the topic metadata has
not been updated
+            // yet and hence the map from source node to topics is stale, in this case we
put the pattern as a place holder;
+            // this should only happen for debugging since during runtime this function should
always be called after the metadata has updated.
+            if (sourceTopics == null)
+                return new SourceNode(name, new String[] {"Pattern[" + pattern + "]"}, keyDeserializer,
valDeserializer);
+            else
+                return new SourceNode(name, maybeDecorateInternalSourceTopics(sourceTopics).toArray(new
String[sourceTopics.length]), keyDeserializer, valDeserializer);
         }
 
         private boolean isMatch(String topic) {
@@ -727,7 +741,10 @@ public class TopologyBuilder {
         int nodeGroupId = 0;
 
         // Go through source nodes first. This makes the group id assignment easy to predict
in tests
-        for (String nodeName : Utils.sorted(nodeToSourceTopics.keySet())) {
+        final HashSet<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet());
+        allSourceNodes.addAll(nodeToSourcePatterns.keySet());
+
+        for (String nodeName : Utils.sorted(allSourceNodes)) {
             String root = nodeGrouper.root(nodeName);
             Set<String> nodeGroup = rootToNodeGroup.get(root);
             if (nodeGroup == null) {
@@ -805,6 +822,7 @@ public class TopologyBuilder {
                     String[] topics = (sourceNodeFactory.pattern != null) ?
                             sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates())
:
                             sourceNodeFactory.getTopics();
+
                     for (String topic : topics) {
                         if (internalTopicNames.contains(topic)) {
                             // prefix the internal topic name with the application id

http://git-wip-us.apache.org/repos/asf/kafka/blob/109aa353/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 622426d..b438b75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -157,29 +157,37 @@ public abstract class AbstractTask {
     }
 
     /**
-     * Produces a string representation contain useful information about a StreamTask.
+     * Produces a string representation containing useful information about a StreamTask.
      * This is useful in debugging scenarios.
      * @return A string representation of the StreamTask instance.
      */
+    @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder("StreamsTask taskId:" + this.id() + "\n");
+        return toString("");
+    }
+
+    /**
+     * Produces a string representation containing useful information about a StreamTask
starting with the given indent.
+     * This is useful in debugging scenarios.
+     * @return A string representation of the StreamTask instance.
+     */
+    public String toString(final String indent) {
+        final StringBuilder sb = new StringBuilder(indent + "StreamsTask taskId: " + this.id()
+ "\n");
 
         // print topology
         if (topology != null) {
-            sb.append("\t\t\t" + topology.toString());
+            sb.append(indent).append(topology.toString(indent + "\t"));
         }
 
         // print assigned partitions
         if (partitions != null && !partitions.isEmpty()) {
-            sb.append("\t\t\tPartitions [");
+            sb.append(indent).append("Partitions [");
             for (TopicPartition topicPartition : partitions) {
-                sb.append(topicPartition.toString() + ",");
+                sb.append(topicPartition.toString()).append(", ");
             }
-            sb.setLength(sb.length() - 1);
-            sb.append("]");
+            sb.setLength(sb.length() - 2);
+            sb.append("]\n");
         }
-
-        sb.append("\n");
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/109aa353/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index d777a4b..f7f8e8d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -85,16 +85,24 @@ public class ProcessorNode<K, V> {
     /**
      * @return a string representation of this node, useful for debugging.
      */
+    @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder("");
-        sb.append(name + ": ");
+        return toString("");
+    }
+
+    /**
+     * @return a string representation of this node starting with the given indent, useful
for debugging.
+     */
+    public String toString(String indent) {
+        final StringBuilder sb = new StringBuilder(indent + name + ":\n");
         if (stateStores != null && !stateStores.isEmpty()) {
-            sb.append("stateStores [");
-            for (String store : (Set<String>) stateStores) {
-                sb.append(store + ",");
+            sb.append(indent).append("\tstates:\t\t[");
+            for (String store : stateStores) {
+                sb.append(store);
+                sb.append(", ");
             }
-            sb.setLength(sb.length() - 1);
-            sb.append("] ");
+            sb.setLength(sb.length() - 2);  // remove the last comma
+            sb.append("]\n");
         }
         return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/109aa353/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 04c0261..9ccc252 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -86,40 +86,49 @@ public class ProcessorTopology {
         return storeToProcessorNodeMap;
     }
 
-    private String childrenToString(List<ProcessorNode<?, ?>> children) {
+    private String childrenToString(String indent, List<ProcessorNode<?, ?>>
children) {
         if (children == null || children.isEmpty()) {
             return "";
         }
 
-        StringBuilder sb = new StringBuilder("children [");
+        StringBuilder sb = new StringBuilder(indent + "\tchildren:\t[");
         for (ProcessorNode child : children) {
-            sb.append(child.name() + ",");
+            sb.append(child.name());
+            sb.append(", ");
         }
-        sb.setLength(sb.length() - 1);
+        sb.setLength(sb.length() - 2);  // remove the last comma
         sb.append("]\n");
 
         // recursively print children
-        for (ProcessorNode child : children) {
-            sb.append("\t\t\t\t" + child.toString());
-            sb.append(childrenToString(child.children()));
+        for (ProcessorNode<?, ?> child : children) {
+            sb.append(child.toString(indent)).append(childrenToString(indent, child.children()));
         }
         return sb.toString();
     }
 
     /**
-     * Produces a string representation contain useful information this topology.
+     * Produces a string representation containing useful information this topology starting
with the given indent.
      * This is useful in debugging scenarios.
      * @return A string representation of this instance.
      */
+    @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder("ProcessorTopology:\n");
+        return toString("");
+    }
+
+    /**
+     * Produces a string representation containing useful information this topology.
+     * This is useful in debugging scenarios.
+     * @return A string representation of this instance.
+     */
+    public String toString(final String indent) {
+        final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
 
         // start from sources
-        for (SourceNode source : sourceByTopics.values()) {
-            sb.append("\t\t\t\t" + source.toString());
-            sb.append(childrenToString(source.children()));
-            sb.append("\n");
+        for (SourceNode<?, ?> source : sourceByTopics.values()) {
+            sb.append(source.toString(indent + "\t")).append(childrenToString(indent + "\t",
source.children()));
         }
         return sb.toString();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/109aa353/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index e7f32b4..65bad39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -77,7 +77,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         }
 
         try {
-            collector.send(new ProducerRecord<K, V>(topic, null, timestamp, key, value),
keySerializer, valSerializer, partitioner);
+            collector.send(new ProducerRecord<>(topic, null, timestamp, key, value),
keySerializer, valSerializer, partitioner);
         } catch (ClassCastException e) {
             throw new StreamsException(
                     String.format("A serializer (key: %s / value: %s) is not compatible to
the actual key or value type " +
@@ -99,9 +99,20 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     /**
      * @return a string representation of this node, useful for debugging.
      */
+    @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder(super.toString());
-        sb.append("topic:" + topic);
+        return toString("");
+    }
+
+    /**
+     * @return a string representation of this node starting with the given indent, useful
for debugging.
+     */
+    public String toString(String indent) {
+        final StringBuilder sb = new StringBuilder(super.toString(indent));
+        sb.append(indent).append("\ttopic:\t\t");
+        sb.append(topic);
+        sb.append("\n");
         return sb.toString();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/109aa353/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index e17509b..ad9c213 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -35,11 +35,11 @@ public class SourceNode<K, V> extends ProcessorNode<K, V>
{
         this.valDeserializer = valDeserializer;
     }
 
-    public K deserializeKey(String topic, byte[] data) {
+    K deserializeKey(String topic, byte[] data) {
         return keyDeserializer.deserialize(topic, data);
     }
 
-    public V deserializeValue(String topic, byte[] data) {
+    V deserializeValue(String topic, byte[] data) {
         return valDeserializer.deserialize(topic, data);
     }
 
@@ -74,14 +74,24 @@ public class SourceNode<K, V> extends ProcessorNode<K, V>
{
     /**
      * @return a string representation of this node, useful for debugging.
      */
+    @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder(super.toString());
-        sb.append("topics: [");
+        return toString("");
+    }
+
+    /**
+     * @return a string representation of this node starting with the given indent, useful
for debugging.
+     */
+    public String toString(String indent) {
+        final StringBuilder sb = new StringBuilder(super.toString(indent));
+        sb.append(indent).append("\ttopics:\t\t[");
         for (String topic : topics) {
-            sb.append(topic + ",");
+            sb.append(topic);
+            sb.append(", ");
         }
-        sb.setLength(sb.length() - 1);
-        sb.append("] ");
+        sb.setLength(sb.length() - 2);  // remove the last comma
+        sb.append("]\n");
         return sb.toString();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/109aa353/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 626de7c..6f58cc4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1025,31 +1025,40 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Produces a string representation contain useful information about a StreamThread.
+     * Produces a string representation containing useful information about a StreamThread.
      * This is useful in debugging scenarios.
      * @return A string representation of the StreamThread instance.
      */
     @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder("StreamsThread appId:" + this.applicationId
+ "\n");
-        sb.append("\tStreamsThread clientId:" + clientId + "\n");
-        sb.append("\tStreamsThread threadId:" + this.getName() + "\n");
+        return toString("");
+    }
+
+    /**
+     * Produces a string representation containing useful information about a StreamThread,
starting with the given indent.
+     * This is useful in debugging scenarios.
+     * @return A string representation of the StreamThread instance.
+     */
+    public String toString(final String indent) {
+        final StringBuilder sb = new StringBuilder(indent + "StreamsThread appId: " + this.applicationId
+ "\n");
+        sb.append(indent).append("\tStreamsThread clientId: ").append(clientId).append("\n");
+        sb.append(indent).append("\tStreamsThread threadId: ").append(this.getName()).append("\n");
 
         // iterate and print active tasks
         if (activeTasks != null) {
-            sb.append("\tActive tasks:\n");
+            sb.append(indent).append("\tActive tasks:\n");
             for (TaskId tId : activeTasks.keySet()) {
                 StreamTask task = activeTasks.get(tId);
-                sb.append("\t\t" + task.toString());
+                sb.append(indent).append(task.toString(indent + "\t\t"));
             }
         }
 
         // iterate and print standby tasks
         if (standbyTasks != null) {
-            sb.append("\tStandby tasks:\n");
+            sb.append(indent).append("\tStandby tasks:\n");
             for (TaskId tId : standbyTasks.keySet()) {
                 StandbyTask task = standbyTasks.get(tId);
-                sb.append("\t\t" + task.toString());
+                sb.append(indent).append(task.toString(indent + "\t\t"));
             }
             sb.append("\n");
         }


Mime
View raw message