kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5226; Fixes issue where adding topics matching a regex
Date Thu, 01 Jun 2017 02:36:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 681c6fc1f -> 6360e04e7


KAFKA-5226; Fixes issue where adding topics matching a regex

subscribed stream may not be detected by all followers until
onJoinComplete returns.

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3157 from bbejeck/KAFKA-5226_null_pointer_source_node_deserialize


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6360e04e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6360e04e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6360e04e

Branch: refs/heads/trunk
Commit: 6360e04e70b8f5ce562c2f24f85208aaae788363
Parents: 681c6fc
Author: Bill Bejeck <bill@confluent.io>
Authored: Wed May 31 19:36:08 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed May 31 19:36:08 2017 -0700

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilder.java      |  4 +
 .../internals/StreamPartitionAssignor.java      | 33 +++++--
 .../processor/internals/StreamThread.java       |  1 -
 .../streams/processor/TopologyBuilderTest.java  |  6 +-
 .../internals/StreamPartitionAssignorTest.java  | 15 ++++
 .../processor/internals/StreamThreadTest.java   | 91 ++++++++++++++++++++
 6 files changed, 140 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6360e04e/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index f9a0397..c3614bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -1532,6 +1532,10 @@ public class TopologyBuilder {
         return applicationId + "-" + topic;
     }
 
+    public SubscriptionUpdates subscriptionUpdates() {
+        return subscriptionUpdates;
+    }
+
     public synchronized Pattern sourceTopicPattern() {
         if (this.topicPattern == null) {
             final List<String> allSourceTopics = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/6360e04e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index abcabf0..9d5d4cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -253,17 +253,23 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         standbyTasks.removeAll(previousActiveTasks);
         SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, previousActiveTasks,
standbyTasks, this.userEndPoint);
 
-        if (streamThread.builder.sourceTopicPattern() != null) {
-            SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
-            log.debug("stream-thread [{}] found {} topics possibly matching regex", streamThread.getName(),
topics);
-            // update the topic groups with the returned subscription set for regex pattern
subscriptions
-            subscriptionUpdates.updateTopics(topics);
-            streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName());
+        if (streamThread.builder.sourceTopicPattern() != null &&
+            !streamThread.builder.subscriptionUpdates().getUpdates().equals(topics)) {
+            updateSubscribedTopics(topics);
         }
 
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
 
+    private void updateSubscribedTopics(Set<String> topics) {
+        SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+        log.debug("stream-thread [{}] found {} topics possibly matching regex",
+                  streamThread.getName(), topics);
+        // update the topic groups with the returned subscription set for regex pattern subscriptions
+        subscriptionUpdates.updateTopics(topics);
+        streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName());
+    }
+
     /*
      * This assigns tasks to consumer clients in the following steps.
      *
@@ -606,6 +612,21 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
         metadataWithInternalTopics = Cluster.empty().withPartitions(topicToPartitionInfo);
+
+        checkForNewTopicAssignments(assignment);
+    }
+
+    private void checkForNewTopicAssignments(Assignment assignment) {
+        if (streamThread.builder.sourceTopicPattern() != null) {
+            final Set<String> assignedTopics = new HashSet<>();
+            for (final TopicPartition topicPartition : assignment.partitions()) {
+                assignedTopics.add(topicPartition.topic());
+            }
+            if (!streamThread.builder.subscriptionUpdates().getUpdates().containsAll(assignedTopics))
{
+                assignedTopics.addAll(streamThread.builder.subscriptionUpdates().getUpdates());
+                updateSubscribedTopics(assignedTopics);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/6360e04e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1e73c89..a453e49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1456,5 +1456,4 @@ public class StreamThread extends Thread {
 
         return firstException;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6360e04e/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 0b60b00..c0ce9e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.processor.internals.InternalTopicManager;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
@@ -681,7 +681,7 @@ public class TopologyBuilderTest {
         builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
         builder.addSource("source-3", Pattern.compile("topic-\\d"));
 
-        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+        SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
         Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
         updatedTopicsField.setAccessible(true);
 
@@ -766,7 +766,7 @@ public class TopologyBuilderTest {
                 .addProcessor("my-processor", new MockProcessorSupplier(), "ingest")
                 .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
 
-        final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+        final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
         final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
         updatedTopicsField.setAccessible(true);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6360e04e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 9937ad4..17eb50a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
@@ -44,6 +45,7 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -104,6 +106,12 @@ public class StreamPartitionAssignorTest {
     private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
     private final TopologyBuilder builder = new TopologyBuilder();
     private final StreamsConfig config = new StreamsConfig(configProps());
+    private final StreamThread mockStreamThread = new StreamThread(builder, config,
+                                                                   mockClientSupplier, "appID",
+                                                                   "clientId", UUID.randomUUID(),
+                                                                   new Metrics(), new MockTime(),
+                                                                   null, 1L);
+    private final Map<String, Object> configurationMap = new HashMap<>();
 
     private Properties configProps() {
         return new Properties() {
@@ -116,6 +124,13 @@ public class StreamPartitionAssignorTest {
         };
     }
 
+    @Before
+    public void setUp() {
+        configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, mockStreamThread);
+        configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 0);
+        partitionAssignor.configure(configurationMap);
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testSubscription() throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6360e04e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e255350..8205c27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -36,6 +36,9 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
@@ -52,6 +55,7 @@ import java.io.File;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -1499,6 +1503,93 @@ public class StreamThreadTest {
         assertFalse(testStreamTask.committed);
     }
 
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception
{
+        final TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.addSource("source", Pattern.compile("t.*"));
+        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source");
+
+        final StreamThread thread = new StreamThread(
+            topologyBuilder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        final Map<String, Object> configurationMap = new HashMap<>();
+
+        configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread);
+        configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 0);
+        partitionAssignor.configure(configurationMap);
+
+        thread.setPartitionAssignor(partitionAssignor);
+
+        final Field
+            nodeToSourceTopicsField =
+            topologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
+        nodeToSourceTopicsField.setAccessible(true);
+        final Map<String, List<String>>
+            nodeToSourceTopics =
+            (Map<String, List<String>>) nodeToSourceTopicsField.get(topologyBuilder);
+        final List<TopicPartition> topicPartitions = new ArrayList<>();
+
+        final TopicPartition topicPartition1 = new TopicPartition("topic-1", 0);
+        final TopicPartition topicPartition2 = new TopicPartition("topic-2", 0);
+        final TopicPartition topicPartition3 = new TopicPartition("topic-3", 0);
+
+        final TaskId taskId1 = new TaskId(0, 0);
+        final TaskId taskId2 = new TaskId(0, 0);
+        final TaskId taskId3 = new TaskId(0, 0);
+
+        List<TaskId> activeTasks = Arrays.asList(taskId1);
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo,
Set<TopicPartition>>());
+
+        topicPartitions.addAll(Arrays.asList(topicPartition1));
+        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(topicPartitions,
info.encode());
+        partitionAssignor.onAssignment(assignment);
+
+        assertTrue(nodeToSourceTopics.get("source").size() == 1);
+        assertTrue(nodeToSourceTopics.get("source").contains("topic-1"));
+
+        topicPartitions.clear();
+
+        activeTasks = Arrays.asList(taskId1, taskId2);
+        info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
+        topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2));
+        assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
+        partitionAssignor.onAssignment(assignment);
+
+        assertTrue(nodeToSourceTopics.get("source").size() == 2);
+        assertTrue(nodeToSourceTopics.get("source").contains("topic-1"));
+        assertTrue(nodeToSourceTopics.get("source").contains("topic-2"));
+
+        topicPartitions.clear();
+
+        activeTasks = Arrays.asList(taskId1, taskId2, taskId3);
+        info = new AssignmentInfo(activeTasks, standbyTasks,
+                               new HashMap<HostInfo, Set<TopicPartition>>());
+        topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2, topicPartition3));
+        assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
+        partitionAssignor.onAssignment(assignment);
+
+        assertTrue(nodeToSourceTopics.get("source").size() == 3);
+        assertTrue(nodeToSourceTopics.get("source").contains("topic-1"));
+        assertTrue(nodeToSourceTopics.get("source").contains("topic-2"));
+        assertTrue(nodeToSourceTopics.get("source").contains("topic-3"));
+
+    }
+
     private void initPartitionGrouper(final StreamsConfig config,
                                       final StreamThread thread,
                                       final MockClientSupplier clientSupplier) {


Mime
View raw message