kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4131; Multiple Regex KStream-Consumers cause Null pointer exception
Date Fri, 16 Sep 2016 00:08:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f3bb10413 -> d43666102


KAFKA-4131; Multiple Regex KStream-Consumers cause Null pointer exception

Fix for bug outlined in KAFKA-4131

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #1843 from bbejeck/KAFKA-4131_mulitple_regex_consumers_cause_npe


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d4366610
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d4366610
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d4366610

Branch: refs/heads/trunk
Commit: d4366610241321a58d62c0b919d60ccfe3c3d698
Parents: f3bb104
Author: Bill Bejeck <bbejeck@gmail.com>
Authored: Thu Sep 15 17:08:00 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Sep 15 17:08:00 2016 -0700

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 15 ++--
 .../integration/RegexSourceIntegrationTest.java | 76 ++++++++++++++++++++
 2 files changed, 85 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d4366610/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 65eda80..b6cebf4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -176,6 +176,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         standbyTasks.removeAll(prevTasks);
         SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks,
this.userEndPointConfig);
 
+        if (streamThread.builder.sourceTopicPattern() != null) {
+            SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+            log.debug("have {} topics matching regex", topics);
+            // update the topic groups with the returned subscription set for regex pattern
subscriptions
+            subscriptionUpdates.updateTopics(topics);
+            streamThread.builder.updateSubscriptions(subscriptionUpdates);
+        }
+
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
 
@@ -255,17 +263,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
         Map<UUID, Set<String>> consumersByClient = new HashMap<>();
         Map<UUID, ClientState<TaskId>> states = new HashMap<>();
-        SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
         Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
         // decode subscription info
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             String consumerId = entry.getKey();
             Subscription subscription = entry.getValue();
 
-            if (streamThread.builder.sourceTopicPattern() != null) {
-               // update the topic groups with the returned subscription list for regex pattern
subscriptions
-                subscriptionUpdates.updateTopics(subscription.topics());
-            }
 
             SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
             if (info.userEndPoint != null) {
@@ -291,7 +294,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             state.capacity = state.capacity + 1d;
         }
 
-        streamThread.builder.updateSubscriptions(subscriptionUpdates);
+
         this.topicGroups = streamThread.builder.topicGroups();
 
         // ensure the co-partitioning topics within the group have the same number of partitions,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d4366610/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index fe0a0eb..8487674 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -80,6 +80,8 @@ public class RegexSourceIntegrationTest {
     private static final String TOPIC_Z = "topic-Z";
     private static final String FA_TOPIC = "fa";
     private static final String FOO_TOPIC = "foo";
+    private static final String PARTITIONED_TOPIC_1 = "partitioned-1";
+    private static final String PARTITIONED_TOPIC_2 = "partitioned-2";
 
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
@@ -97,6 +99,8 @@ public class RegexSourceIntegrationTest {
         CLUSTER.createTopic(TOPIC_Z);
         CLUSTER.createTopic(FA_TOPIC);
         CLUSTER.createTopic(FOO_TOPIC);
+        CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
+        CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
 
     }
 
@@ -275,6 +279,78 @@ public class RegexSourceIntegrationTest {
         assertThat(actualValues, equalTo(expectedReceivedValues));
     }
 
+    @Test
+    public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception {
+
+        final Serde<String> stringSerde = Serdes.String();
+        final KStreamBuilder builderLeader = new KStreamBuilder();
+        final KStreamBuilder builderFollower = new KStreamBuilder();
+        final List<String> expectedAssignment = Arrays.asList(PARTITIONED_TOPIC_1,
 PARTITIONED_TOPIC_2);
+
+        final KStream<String, String> partitionedStreamLeader = builderLeader.stream(Pattern.compile("partitioned-\\d"));
+        final KStream<String, String> partitionedStreamFollower = builderFollower.stream(Pattern.compile("partitioned-\\d"));
+
+
+        partitionedStreamLeader.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+        partitionedStreamFollower.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+        final KafkaStreams partitionedStreamsLeader  = new KafkaStreams(builderLeader, streamsConfiguration);
+        final KafkaStreams partitionedStreamsFollower  = new KafkaStreams(builderFollower,
streamsConfiguration);
+
+        final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
+
+
+        final Field leaderStreamThreadsField = partitionedStreamsLeader.getClass().getDeclaredField("threads");
+        leaderStreamThreadsField.setAccessible(true);
+        final StreamThread[] leaderStreamThreads = (StreamThread[]) leaderStreamThreadsField.get(partitionedStreamsLeader);
+        final StreamThread originalLeaderThread = leaderStreamThreads[0];
+
+        final TestStreamThread leaderTestStreamThread = new TestStreamThread(builderLeader,
streamsConfig,
+                new DefaultKafkaClientSupplier(),
+                originalLeaderThread.applicationId, originalLeaderThread.clientId, originalLeaderThread.processId,
new Metrics(), new SystemTime());
+
+        leaderStreamThreads[0] = leaderTestStreamThread;
+
+        final TestCondition bothTopicsAddedToLeader = new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return leaderTestStreamThread.assignedTopicPartitions.equals(expectedAssignment);
+            }
+        };
+
+
+
+        final Field followerStreamThreadsField = partitionedStreamsFollower.getClass().getDeclaredField("threads");
+        followerStreamThreadsField.setAccessible(true);
+        final StreamThread[] followerStreamThreads = (StreamThread[]) followerStreamThreadsField.get(partitionedStreamsFollower);
+        final StreamThread originalFollowerThread = followerStreamThreads[0];
+
+        final TestStreamThread followerTestStreamThread = new TestStreamThread(builderFollower,
streamsConfig,
+                new DefaultKafkaClientSupplier(),
+                originalFollowerThread.applicationId, originalFollowerThread.clientId, originalFollowerThread.processId,
new Metrics(), new SystemTime());
+
+        followerStreamThreads[0] = followerTestStreamThread;
+
+
+        final TestCondition bothTopicsAddedToFollower = new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return followerTestStreamThread.assignedTopicPartitions.equals(expectedAssignment);
+            }
+        };
+
+        partitionedStreamsLeader.start();
+        TestUtils.waitForCondition(bothTopicsAddedToLeader, "Topics never assigned to leader
stream");
+
+
+        partitionedStreamsFollower.start();
+        TestUtils.waitForCondition(bothTopicsAddedToFollower, "Topics never assigned to follower
stream");
+
+        partitionedStreamsLeader.close();
+        partitionedStreamsFollower.close();
+
+    }
+
     // TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
     @Test(expected = AssertionError.class)
     public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {


Mime
View raw message