kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3642: Fix NPE from ProcessorStateManager when the changelog topic not exists
Date Tue, 03 May 2016 17:47:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 62253539d -> 1f528815d


KAFKA-3642: Fix NPE from ProcessorStateManager when the changelog topic not exists

Issue: https://issues.apache.org/jira/browse/KAFKA-3642

Author: Yuto Kawamura <kawamuray.dadada@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1289 from kawamuray/KAFKA-3642-streams-NPE


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1f528815
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1f528815
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1f528815

Branch: refs/heads/trunk
Commit: 1f528815de8d0e094ee5446794ab7325629ca7ed
Parents: 6225353
Author: Yuto Kawamura <kawamuray.dadada@gmail.com>
Authored: Tue May 3 10:47:23 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue May 3 10:47:23 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/MockConsumer.java    |  6 +----
 .../internals/ProcessorStateManager.java        |  6 ++++-
 .../internals/StreamPartitionAssignor.java      | 27 +++++++++++++-------
 .../internals/ProcessorStateManagerTest.java    |  4 +--
 4 files changed, 26 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1f528815/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 8dce1f1..9ab4c29 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -268,11 +268,7 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
         ensureNotClosed();
-        List<PartitionInfo> parts = this.partitions.get(topic);
-        if (parts == null)
-            return Collections.emptyList();
-        else
-            return parts;
+        return this.partitions.get(topic);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f528815/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 0cdf44c..1d97384 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -186,7 +186,11 @@ public class ProcessorStateManager {
                 // ignore
             }
 
-            for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(topic)) {
+            List<PartitionInfo> partitionInfos = restoreConsumer.partitionsFor(topic);
+            if (partitionInfos == null) {
+                throw new StreamsException("Could not find partition info for topic: " +
topic);
+            }
+            for (PartitionInfo partitionInfo : partitionInfos) {
                 if (partitionInfo.partition() == partition) {
                     partitionNotFound = false;
                     break;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f528815/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 341e66a..f2eea36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -152,7 +152,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
      * @param topicToTaskIds Map that contains the topic names to be created
      * @param compactTopic If true, the topic should be a compacted topic. This is used for
      *                     change log topics usually.
-     * @param outPartitionInfo If true, compute and return all partitions created
      * @param postPartitionPhase If true, the computation for calculating the number of partitions
      *                           is slightly different. Set to true after the initial topic-to-partition
      *                           assignment.
@@ -160,7 +159,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
      */
     private Map<TopicPartition, PartitionInfo> prepareTopic(Map<String, Set<TaskId>>
topicToTaskIds,
                                                             boolean compactTopic,
-                                                            boolean outPartitionInfo,
                                                             boolean postPartitionPhase) {
         Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>();
         // if ZK is specified, prepare the internal source topic before calling partition
grouper
@@ -192,13 +190,24 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     partitions = streamThread.restoreConsumer.partitionsFor(topic);
                 } while (partitions == null || partitions.size() != numPartitions);
 
-                if (outPartitionInfo) {
-                    for (PartitionInfo partition : partitions)
-                        partitionInfos.put(new TopicPartition(partition.topic(), partition.partition()),
partition);
-                }
+                for (PartitionInfo partition : partitions)
+                    partitionInfos.put(new TopicPartition(partition.topic(), partition.partition()),
partition);
             }
 
             log.info("Completed validating internal topics in partition assignor.");
+        } else {
+            List<String> missingTopics = new ArrayList<>();
+            for (String topic : topicToTaskIds.keySet()) {
+                List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic);
+                if (partitions == null) {
+                    missingTopics.add(topic);
+                }
+            }
+            if (!missingTopics.isEmpty()) {
+                log.warn("Topic {} do not exists but couldn't created as the config '{}'
isn't supplied",
+                         missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
+
+            }
         }
 
         return partitionInfos;
@@ -284,7 +293,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
 
-        Map<TopicPartition, PartitionInfo> internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds,
false, true, false);
+        Map<TopicPartition, PartitionInfo> internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds,
false, false);
         internalSourceTopicToTaskIds.clear();
 
         Cluster metadataWithInternalTopics = metadata;
@@ -380,9 +389,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
 
         // if ZK is specified, validate the internal topics again
-        prepareTopic(internalSourceTopicToTaskIds, false /* compactTopic */, false, true);
+        prepareTopic(internalSourceTopicToTaskIds, false /* compactTopic */, true);
         // change log topics should be compacted
-        prepareTopic(stateChangelogTopicToTaskIds, true /* compactTopic */, false, true);
+        prepareTopic(stateChangelogTopicToTaskIds, true /* compactTopic */, true);
 
         return assignment;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f528815/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index e3669e8..890af0f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -29,6 +28,7 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
@@ -223,7 +223,7 @@ public class ProcessorStateManagerTest {
         }
     }
 
-    @Test(expected = KafkaException.class)
+    @Test(expected = StreamsException.class)
     public void testNoTopic() throws IOException {
         File baseDir = Files.createTempDirectory(stateDir).toFile();
         try {


Mime
View raw message