kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: MINOR: refactor build method to extract methods from if statements
Date Tue, 12 Sep 2017 08:26:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a67140317 -> e1491d4a0


MINOR: refactor build method to extract methods from if statements

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3833 from bbejeck/MINOR_extract_methods_from_build


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1491d4a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1491d4a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1491d4a

Branch: refs/heads/trunk
Commit: e1491d4a0463deaaa8de7e100dddc2edbc030abf
Parents: a671403
Author: Bill Bejeck <bill@confluent.io>
Authored: Tue Sep 12 09:26:09 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Tue Sep 12 09:26:09 2017 +0100

----------------------------------------------------------------------
 .../internals/InternalTopologyBuilder.java      | 116 ++++++++++++-------
 1 file changed, 72 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e1491d4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 437e9e5..193d0e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -898,54 +898,21 @@ public class InternalTopologyBuilder {
                 processorMap.put(node.name(), node);
 
                 if (factory instanceof ProcessorNodeFactory) {
-                    for (final String predecessor : ((ProcessorNodeFactory) factory).predecessors)
{
-                        final ProcessorNode<?, ?> predecessorNode = processorMap.get(predecessor);
-                        predecessorNode.addChild(node);
-                    }
-                    for (final String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames)
{
-                        if (!stateStoreMap.containsKey(stateStoreName)) {
-                            if (stateFactories.containsKey(stateStoreName)) {
-                                final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
-
-                                // remember the changelog topic if this state store is change-logging
enabled
-                                if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName))
{
-                                    final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId,
stateStoreName);
-                                    storeToChangelogTopic.put(stateStoreName, changelogTopic);
-                                }
-                                stateStoreMap.put(stateStoreName, stateStoreFactory.build());
-                            } else {
-                                stateStoreMap.put(stateStoreName, globalStateStores.get(stateStoreName));
-                            }
-
+                    buildProcessorNode(processorMap,
+                                       stateStoreMap,
+                                       (ProcessorNodeFactory) factory,
+                                       node);
 
-                        }
-                    }
                 } else if (factory instanceof SourceNodeFactory) {
-                    final SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory;
-                    final List<String> topics = (sourceNodeFactory.pattern != null)
?
-                            sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates())
:
-                            sourceNodeFactory.topics;
+                    buildSourceNode(topicSourceMap,
+                                    (SourceNodeFactory) factory,
+                                    (SourceNode) node);
 
-                    for (final String topic : topics) {
-                        if (internalTopicNames.contains(topic)) {
-                            // prefix the internal topic name with the application id
-                            topicSourceMap.put(decorateTopic(topic), (SourceNode) node);
-                        } else {
-                            topicSourceMap.put(topic, (SourceNode) node);
-                        }
-                    }
                 } else if (factory instanceof SinkNodeFactory) {
-                    final SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory;
-
-                    for (final String predecessor : sinkNodeFactory.predecessors) {
-                        processorMap.get(predecessor).addChild(node);
-                        if (internalTopicNames.contains(sinkNodeFactory.topic)) {
-                            // prefix the internal topic name with the application id
-                            topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode)
node);
-                        } else {
-                            topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node);
-                        }
-                    }
+                    buildSinkNode(processorMap,
+                                  topicSinkMap,
+                                  (SinkNodeFactory) factory,
+                                  node);
                 } else {
                     throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                 }
@@ -955,6 +922,67 @@ public class InternalTopologyBuilder {
         return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()),
storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
     }
 
+    private void buildSinkNode(final Map<String, ProcessorNode> processorMap,
+                               final Map<String, SinkNode> topicSinkMap,
+                               final SinkNodeFactory sinkNodeFactory,
+                               final ProcessorNode node) {
+
+        for (final String predecessor : sinkNodeFactory.predecessors) {
+            processorMap.get(predecessor).addChild(node);
+            if (internalTopicNames.contains(sinkNodeFactory.topic)) {
+                // prefix the internal topic name with the application id
+                topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode) node);
+            } else {
+                topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node);
+            }
+        }
+    }
+
+    private void buildSourceNode(final Map<String, SourceNode> topicSourceMap,
+                                 final SourceNodeFactory sourceNodeFactory,
+                                 final SourceNode node) {
+
+        final List<String> topics = (sourceNodeFactory.pattern != null) ?
+                                    sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates())
:
+                                    sourceNodeFactory.topics;
+
+        for (final String topic : topics) {
+            if (internalTopicNames.contains(topic)) {
+                // prefix the internal topic name with the application id
+                topicSourceMap.put(decorateTopic(topic), node);
+            } else {
+                topicSourceMap.put(topic, node);
+            }
+        }
+    }
+
+    private void buildProcessorNode(final Map<String, ProcessorNode> processorMap,
+                                    final Map<String, StateStore> stateStoreMap,
+                                    final ProcessorNodeFactory factory,
+                                    final ProcessorNode node) {
+
+        for (final String predecessor : factory.predecessors) {
+            final ProcessorNode<?, ?> predecessorNode = processorMap.get(predecessor);
+            predecessorNode.addChild(node);
+        }
+        for (final String stateStoreName : factory.stateStoreNames) {
+            if (!stateStoreMap.containsKey(stateStoreName)) {
+                if (stateFactories.containsKey(stateStoreName)) {
+                    final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+
+                    // remember the changelog topic if this state store is change-logging
enabled
+                    if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName))
{
+                        final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId,
stateStoreName);
+                        storeToChangelogTopic.put(stateStoreName, changelogTopic);
+                    }
+                    stateStoreMap.put(stateStoreName, stateStoreFactory.build());
+                } else {
+                    stateStoreMap.put(stateStoreName, globalStateStores.get(stateStoreName));
+                }
+            }
+        }
+    }
+
     /**
      * Get any global {@link StateStore}s that are part of the
      * topology


Mime
View raw message