kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: Avoid NPE in StreamsPartitionAssignor
Date Thu, 03 Mar 2016 16:57:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 079c88178 -> 1d19ac9fe


HOTFIX: Avoid NPE in StreamsPartitionAssignor

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Michael G. Noll <michael@confluent.io>

Closes #1004 from guozhangwang/KStreamPANPE


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d19ac9f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d19ac9f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d19ac9f

Branch: refs/heads/trunk
Commit: 1d19ac9fea5b3344a0cdd00ee8c58ab8a22a5579
Parents: 079c881
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Mar 3 08:57:21 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Mar 3 08:57:21 2016 -0800

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 35 ++++++++++----------
 1 file changed, 17 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d19ac9f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 7d89573..1b3bf10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -188,6 +188,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         // ensure the co-partitioning topics within the group have the same number of partitions,
         // and enforce the number of partitions for those internal topics.
+        internalSourceTopicToTaskIds = new HashMap<>();
         Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
         Map<Integer, Set<String>> internalSourceTopicGroups = new HashMap<>();
         for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet())
{
@@ -229,37 +230,35 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         if (internalTopicManager != null) {
             log.debug("Starting to validate internal source topics in partition assignor.");
 
-            if (internalSourceTopicToTaskIds != null) {
-                for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet())
{
-                    String topic = streamThread.jobId + "-" + entry.getKey();
+            for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet())
{
+                String topic = streamThread.jobId + "-" + entry.getKey();
 
-                    // should have size 1 only
-                    int numPartitions = -1;
-                    for (TaskId task : entry.getValue()) {
-                        numPartitions = task.partition;
-                    }
+                // should have size 1 only
+                int numPartitions = -1;
+                for (TaskId task : entry.getValue()) {
+                    numPartitions = task.partition;
+                }
 
-                    internalTopicManager.makeReady(topic, numPartitions);
+                internalTopicManager.makeReady(topic, numPartitions);
 
-                    // wait until the topic metadata has been propagated to all brokers
-                    List<PartitionInfo> partitions;
-                    do {
-                        partitions = streamThread.restoreConsumer.partitionsFor(topic);
-                    } while (partitions == null || partitions.size() != numPartitions);
+                // wait until the topic metadata has been propagated to all brokers
+                List<PartitionInfo> partitions;
+                do {
+                    partitions = streamThread.restoreConsumer.partitionsFor(topic);
+                } while (partitions == null || partitions.size() != numPartitions);
 
-                    metadata.update(topic, partitions);
-                }
+                metadata.update(topic, partitions);
             }
 
             log.info("Completed validating internal source topics in partition assignor.");
         }
+        internalSourceTopicToTaskIds.clear();
 
         // get the tasks as partition groups from the partition grouper
         Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups,
metadata);
 
-        // add tasks to state topic subscribers
+        // add tasks to state change log topic subscribers
         stateChangelogTopicToTaskIds = new HashMap<>();
-        internalSourceTopicToTaskIds = new HashMap<>();
         for (TaskId task : partitionsForTask.keySet()) {
             for (String topicName : topicGroups.get(task.topicGroupId).stateChangelogTopics)
{
                 Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topicName);


Mime
View raw message