kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4060 and KAFKA-4476 follow up: remove unnecessary InternalTopicManager branch and fixed one copartitioning bug
Date Sat, 21 Jan 2017 21:18:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 c3f923cbf -> db57c1a0c


KAFKA-4060 and KAFKA-4476 follow up: remove unnecessary InternalTopicManager branch and fixed
one copartitioning bug

ZK removed reveal a bug in `StreamPartitionAssigner` but did not fix it properly. This is
a follow up bug fix.

Issue:
 - If topic metadata is missing, `StreamPartitionAssigner` should not create any affected
tasks that consume topics with missing metadata.
 - Depending downstream tasks should not be create either.
 - For tasks that are not created, no store changelog topics (if any) should get created
 - For tasks that write output to not-yet existing internal repartitioning topics, those repartitioning
topics should not get created

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #2404 from mjsax/kafka-4060-zk-test-follow-up

(cherry picked from commit 0b99bea590842018e8e97e7fd1c71b1471db4d08)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/db57c1a0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/db57c1a0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/db57c1a0

Branch: refs/heads/0.10.2
Commit: db57c1a0c7e057e19ec165d3cf9fa35d54b97c8f
Parents: c3f923c
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Sat Jan 21 13:18:39 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Jan 21 13:18:47 2017 -0800

----------------------------------------------------------------------
 .../processor/DefaultPartitionGrouper.java      |   2 +
 .../internals/StreamPartitionAssignor.java      |  59 +++++------
 .../internals/StreamPartitionAssignorTest.java  | 103 ++++++++++++-------
 3 files changed, 90 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/db57c1a0/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 1da1209..2f354d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,6 +84,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
 
             if (partitions == null) {
                 log.info("Skipping assigning topic {} to tasks since its metadata is not
available yet", topic);
+                maxNumPartitions = StreamPartitionAssignor.NOT_AVAILABLE;
             } else {
                 int numPartitions = partitions.size();
                 if (numPartitions > maxNumPartitions)

http://git-wip-us.apache.org/repos/asf/kafka/blob/db57c1a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 7b48a6f..382c4e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -59,7 +59,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
 
-    public final static int UNKNOWN = -1;
+    private final static int UNKNOWN = -1;
     public final static int NOT_AVAILABLE = -2;
 
     private static class AssignedPartition implements Comparable<AssignedPartition>
{
@@ -159,7 +159,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
     private Map<TaskId, Set<TopicPartition>> activeTasks;
 
-    InternalTopicManager internalTopicManager;
+    private InternalTopicManager internalTopicManager;
 
     /**
      * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
@@ -369,9 +369,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // create these topics if necessary
         prepareTopic(repartitionTopicMetadata);
 
-        metadataWithInternalTopics = metadata;
-        if (internalTopicManager != null)
-            metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
+        metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
 
         log.debug("stream-thread [{}] Created repartition topics {} from the parsed topology.",
streamThread.getName(), allRepartitionTopicPartitions.values());
 
@@ -592,43 +590,27 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
      *
      * @param topicPartitions Map that contains the topic names to be created with the number
of partitions
      */
-    private void prepareTopic(Map<String, InternalTopicMetadata> topicPartitions) {
+    private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions)
{
         log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.",
streamThread.getName());
 
-        // if ZK is specified, prepare the internal source topic before calling partition
grouper
-        if (internalTopicManager != null) {
-            for (Map.Entry<String, InternalTopicMetadata> entry : topicPartitions.entrySet())
{
-                InternalTopicConfig topic = entry.getValue().config;
-                Integer numPartitions = entry.getValue().numPartitions;
+        for (final Map.Entry<String, InternalTopicMetadata> entry : topicPartitions.entrySet())
{
+            final InternalTopicConfig topic = entry.getValue().config;
+            final Integer numPartitions = entry.getValue().numPartitions;
 
-                if (numPartitions == NOT_AVAILABLE) {
-                    continue;
-                }
-                if (numPartitions < 0) {
-                    throw new TopologyBuilderException(String.format("stream-thread [%s]
Topic [%s] number of partitions not defined", streamThread.getName(), topic.name()));
-                }
-
-                internalTopicManager.makeReady(topic, numPartitions);
-
-                // wait until the topic metadata has been propagated to all brokers
-                List<PartitionInfo> partitions;
-                do {
-                    partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
-                } while (partitions == null || partitions.size() != numPartitions);
+            if (numPartitions == NOT_AVAILABLE) {
+                continue;
             }
-        } else {
-            List<String> missingTopics = new ArrayList<>();
-            for (String topic : topicPartitions.keySet()) {
-                List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic);
-                if (partitions == null) {
-                    missingTopics.add(topic);
-                }
+            if (numPartitions < 0) {
+                throw new TopologyBuilderException(String.format("stream-thread [%s] Topic
[%s] number of partitions not defined", streamThread.getName(), topic.name()));
             }
 
-            if (!missingTopics.isEmpty()) {
-                log.warn("stream-thread [{}] Topic {} do not exists but couldn't created
as the config '{}' isn't supplied",
-                        streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
-            }
+            internalTopicManager.makeReady(topic, numPartitions);
+
+            // wait until the topic metadata has been propagated to all brokers
+            List<PartitionInfo> partitions;
+            do {
+                partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
+            } while (partitions == null || partitions.size() != numPartitions);
         }
 
         log.info("stream-thread [{}] Completed validating internal topics in partition assignor",
streamThread.getName());
@@ -661,6 +643,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     Arrays.sort(topics);
                     throw new TopologyBuilderException(String.format("stream-thread [%s]
Topics not co-partitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics),
",")));
                 }
+            } else {
+                if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE)
{
+                    numPartitions = NOT_AVAILABLE;
+                    break;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/db57c1a0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index c212f14..eff2179 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
@@ -38,23 +39,23 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockStateStoreSupplier;
-import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.Properties;
 import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.ArrayList;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
@@ -91,7 +92,7 @@ public class StreamPartitionAssignorTest {
             new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster metadata = new Cluster("cluster", Arrays.asList(Node.noNode()), infos,
Collections.<String>emptySet(),
+    private Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()),
infos, Collections.<String>emptySet(),
             Collections.<String>emptySet());
 
     private final TaskId task0 = new TaskId(0, 0);
@@ -248,17 +249,17 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(),
"test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
0);
+        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test",
client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
0);
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config,
mockClientSupplier.restoreConsumer));
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
             new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(),
Collections.<TaskId>emptySet(), userEndPoint).encode()));
 
-        // TODO: Update the code accordingly,
-        // This line was added to fix the test failure since internalTopicManager is created
in the config method all the time.
-        partitionAssignor.internalTopicManager = null;
+
         // will throw exception if it fails
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata,
subscriptions);
 
@@ -285,7 +286,7 @@ public class StreamPartitionAssignorTest {
 
         final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
         final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
-        final  Cluster emptyMetadata = new Cluster("cluster", Arrays.asList(Node.noNode()),
+        final  Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()),
             Collections.<PartitionInfo>emptySet(),
             Collections.<String>emptySet(),
             Collections.<String>emptySet());
@@ -443,9 +444,6 @@ public class StreamPartitionAssignorTest {
         subscriptions.put("consumer20",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(),
Collections.<TaskId>emptySet(), userEndPoint).encode()));
 
-        // TODO: Update the code accordingly,
-        // This line was added to fix the test failure since internalTopicManager is created
in the config method all the time.
-        partitionAssignor.internalTopicManager = null;
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata,
subscriptions);
 
         // check assigned partition size: since there is no previous task and there are two
sub-topologies the assignment is random so we cannot check exact match
@@ -633,7 +631,7 @@ public class StreamPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put("consumer10",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks,
emptyTasks, userEndPoint).encode()));
 
@@ -676,11 +674,11 @@ public class StreamPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put("consumer10",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks,
emptyTasks, userEndPoint).encode()));
 
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata,
subscriptions);
+        partitionAssignor.assign(metadata, subscriptions);
 
         // check prepared internal topics
         assertEquals(2, internalTopicManager.readyTopics.size());
@@ -742,7 +740,7 @@ public class StreamPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config,
clientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put("consumer1",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks,
emptyTasks, myEndPoint).encode()));
 
@@ -815,7 +813,7 @@ public class StreamPartitionAssignorTest {
     @Test
     public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
-        List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
+        List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic",
0));
         final Map<HostInfo, Set<TopicPartition>> hostState =
                 Collections.singletonMap(new HostInfo("localhost", 80),
                         Collections.singleton(new TopicPartition("topic", 0)));
@@ -830,7 +828,7 @@ public class StreamPartitionAssignorTest {
     public void shouldSetClusterMetadataOnAssignment() throws Exception {
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
 
-        final List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic",
0));
+        final List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic",
0));
         final Map<HostInfo, Set<TopicPartition>> hostState =
                 Collections.singletonMap(new HostInfo("localhost", 80),
                         Collections.singleton(new TopicPartition("topic", 0)));
@@ -863,22 +861,47 @@ public class StreamPartitionAssignorTest {
         builder.setApplicationId(applicationId);
 
         KStream<Object, Object> stream1 = builder
+
+            // Task 1 (should get created):
             .stream("topic1")
+            // force repartitioning for aggregation
             .selectKey(new KeyValueMapper<Object, Object, Object>() {
                 @Override
                 public Object apply(Object key, Object value) {
                     return null;
                 }
             })
-            .through("topic2");
+            .groupByKey()
+
+            // Task 2 (should get created):
+            // create repartioning and changelog topic as task 1 exists
+            .count("count")
+
+            // force repartitioning for join, but second join input topic unknown
+            // -> internal repartitioning topic should not get created
+            .toStream()
+            .map(new KeyValueMapper<Object, Long, KeyValue<Object, Object>>()
{
+                @Override
+                public KeyValue<Object, Object> apply(Object key, Long value) {
+                    return null;
+                }
+            });
+
         builder
+            // Task 3 (should not get created because input topic unknown)
             .stream("unknownTopic")
+
+            // force repartitioning for join, but input topic unknown
+            // -> thus should not create internal repartitioning topic
             .selectKey(new KeyValueMapper<Object, Object, Object>() {
                 @Override
                 public Object apply(Object key, Object value) {
                     return null;
                 }
             })
+
+            // Task 4 (should not get created because input topics unknown)
+            // should not create any of both input repartition topics or any of both changelog
topics
             .join(
                 stream1,
                 new ValueJoiner() {
@@ -894,13 +917,16 @@ public class StreamPartitionAssignorTest {
         final String client = "client1";
 
         final StreamsConfig config = new StreamsConfig(configProps());
-        final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(),
applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder,
StreamsMetadataState.UNKNOWN_HOST), 0);
+        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+        final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier,
applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder,
StreamsMetadataState.UNKNOWN_HOST), 0);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId,
client));
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamThread.config,
mockClientSupplier.restoreConsumer);
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put(
             client,
             new PartitionAssignor.Subscription(
@@ -908,20 +934,22 @@ public class StreamPartitionAssignorTest {
                 new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
             )
         );
-        // TODO: Update the code accordingly,
-        // This line was added to fix the test failure since internalTopicManager is created
in the config method all the time.
-        partitionAssignor.internalTopicManager = null;
         final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata,
subscriptions);
 
+        final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>();
+        expectedCreatedInternalTopics.put(applicationId + "-count-repartition", 3);
+        expectedCreatedInternalTopics.put(applicationId + "-count-changelog", 3);
+        assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics));
+
         final List<TopicPartition> expectedAssignment = Arrays.asList(
             new TopicPartition("topic1", 0),
             new TopicPartition("topic1", 1),
             new TopicPartition("topic1", 2),
-            new TopicPartition("topic2", 0),
-            new TopicPartition("topic2", 1),
-            new TopicPartition("topic2", 2)
+            new TopicPartition(applicationId + "-count-repartition", 0),
+            new TopicPartition(applicationId + "-count-repartition", 1),
+            new TopicPartition(applicationId + "-count-repartition", 2)
         );
-        assertThat(expectedAssignment, equalTo(assignment.get(client).partitions()));
+        assertThat(new HashSet(assignment.get(client).partitions()), equalTo(new HashSet(expectedAssignment)));
     }
 
     @Test
@@ -981,13 +1009,15 @@ public class StreamPartitionAssignorTest {
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
-        final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(),
applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder,
StreamsMetadataState.UNKNOWN_HOST), 0);
+        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+        final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier,
applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder,
StreamsMetadataState.UNKNOWN_HOST), 0);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId,
client));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config,
mockClientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put(
                 "consumer1",
                 new PartitionAssignor.Subscription(
@@ -1004,9 +1034,6 @@ public class StreamPartitionAssignorTest {
                 )
         );
         final Set<TopicPartition> allPartitions = Utils.mkSet(t1p0, t1p1, t1p2);
-        // TODO: Update the code accordingly,
-        // This line was added to fix the test failure since internalTopicManager is created
in the config method all the time.
-        partitionAssignor.internalTopicManager = null;
         final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata,
subscriptions);
         final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
         final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());


Mime
View raw message