This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 9bf815e KAFKA-10102: update ProcessorTopology instead of rebuilding it (#8803)
9bf815e is described below
commit 9bf815e86f2edf27501d4dbb06a77dbf2e44ce55
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Mon Jun 8 15:55:16 2020 -0700
KAFKA-10102: update ProcessorTopology instead of rebuilding it (#8803)
Reviewers: Boyang Chen <boyang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
.../streams/processor/internals/AbstractTask.java | 6 +-
.../internals/InternalTopologyBuilder.java | 15 ++---
.../processor/internals/ProcessorTopology.java | 72 +++++++++++++++++++---
.../streams/processor/internals/SourceNode.java | 24 +-------
.../streams/processor/internals/StreamTask.java | 16 ++---
.../kafka/streams/processor/internals/Task.java | 3 +-
.../streams/processor/internals/TaskManager.java | 2 +-
.../processor/internals/GlobalStateTaskTest.java | 2 -
.../processor/internals/PartitionGroupTest.java | 4 +-
.../processor/internals/ProcessorTopologyTest.java | 25 +++++++-
.../internals/RecordDeserializerTest.java | 4 +-
.../processor/internals/RecordQueueTest.java | 2 +-
.../processor/internals/SourceNodeTest.java | 7 +--
.../processor/internals/StreamTaskTest.java | 21 ++++++-
.../processor/internals/TaskManagerTest.java | 6 +-
.../java/org/apache/kafka/test/MockSourceNode.java | 6 +-
16 files changed, 139 insertions(+), 76 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index adebaf3..f59571b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.List;
+import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
@@ -118,8 +120,8 @@ public abstract class AbstractTask implements Task {
}
@Override
- public void update(final Set<TopicPartition> topicPartitions, final ProcessorTopology
processorTopology) {
+ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
List<String>> nodeToSourceTopics) {
this.inputPartitions = topicPartitions;
- this.topology = processorTopology;
+ topology.updateSourceTopics(nodeToSourceTopics);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 6c3934b..7dc2df1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -270,16 +270,7 @@ public class InternalTopologyBuilder {
@Override
public ProcessorNode<K, V> build() {
- final List<String> sourceTopics = nodeToSourceTopics.get(name);
-
- // if it is subscribed via patterns, it is possible that the topic metadata has
not been updated
- // yet and hence the map from source node to topics is stale, in this case we
put the pattern as a place holder;
- // this should only happen for debugging since during runtime this function should
always be called after the metadata has updated.
- if (sourceTopics == null) {
- return new SourceNode<>(name, Collections.singletonList(String.valueOf(pattern)),
timestampExtractor, keyDeserializer, valDeserializer);
- } else {
- return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics),
timestampExtractor, keyDeserializer, valDeserializer);
- }
+ return new SourceNode<>(name, timestampExtractor, keyDeserializer, valDeserializer);
}
private boolean isMatch(final String topic) {
@@ -1105,6 +1096,10 @@ public class InternalTopologyBuilder {
return Collections.unmodifiableMap(topicGroups);
}
+ public Map<String, List<String>> nodeToSourceTopics() {
+ return Collections.unmodifiableMap(nodeToSourceTopics);
+ }
+
private RepartitionTopicConfig buildRepartitionTopicConfig(final String internalTopic,
final Optional<Integer>
numberOfPartitions) {
return numberOfPartitions
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 1497329..51b61b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.ArrayList;
+import java.util.HashMap;
import org.apache.kafka.streams.processor.StateStore;
import java.util.Collections;
@@ -23,11 +25,15 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ProcessorTopology {
+ private final Logger log = LoggerFactory.getLogger(ProcessorTopology.class);
private final List<ProcessorNode<?, ?>> processorNodes;
- private final Map<String, SourceNode<?, ?>> sourcesByTopic;
+ private final Map<String, SourceNode<?, ?>> sourceNodesByName;
+ private final Map<String, SourceNode<?, ?>> sourceNodesByTopic;
private final Map<String, SinkNode<?, ?>> sinksByTopic;
private final Set<String> terminalNodes;
private final List<StateStore> stateStores;
@@ -38,14 +44,14 @@ public class ProcessorTopology {
private final Map<String, String> storeToChangelogTopic;
public ProcessorTopology(final List<ProcessorNode<?, ?>> processorNodes,
- final Map<String, SourceNode<?, ?>> sourcesByTopic,
+ final Map<String, SourceNode<?, ?>> sourceNodesByTopic,
final Map<String, SinkNode<?, ?>> sinksByTopic,
final List<StateStore> stateStores,
final List<StateStore> globalStateStores,
final Map<String, String> storeToChangelogTopic,
final Set<String> repartitionTopics) {
this.processorNodes = Collections.unmodifiableList(processorNodes);
- this.sourcesByTopic = Collections.unmodifiableMap(sourcesByTopic);
+ this.sourceNodesByTopic = new HashMap<>(sourceNodesByTopic);
this.sinksByTopic = Collections.unmodifiableMap(sinksByTopic);
this.stateStores = Collections.unmodifiableList(stateStores);
this.globalStateStores = Collections.unmodifiableList(globalStateStores);
@@ -58,18 +64,23 @@ public class ProcessorTopology {
terminalNodes.add(node.name());
}
}
+
+ this.sourceNodesByName = new HashMap<>();
+ for (final SourceNode<?, ?> source : sourceNodesByTopic.values()) {
+ sourceNodesByName.put(source.name(), source);
+ }
}
public Set<String> sourceTopics() {
- return sourcesByTopic.keySet();
+ return sourceNodesByTopic.keySet();
}
public SourceNode<?, ?> source(final String topic) {
- return sourcesByTopic.get(topic);
+ return sourceNodesByTopic.get(topic);
}
public Set<SourceNode<?, ?>> sources() {
- return new HashSet<>(sourcesByTopic.values());
+ return new HashSet<>(sourceNodesByTopic.values());
}
public Set<String> sinkTopics() {
@@ -131,6 +142,27 @@ public class ProcessorTopology {
return false;
}
+ public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName)
{
+ if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
+ log.error("Set of source nodes do not match: \n" +
+ "sourceNodesByName = {}\n" +
+ "sourceTopicsByName = {}",
+ sourceNodesByName.keySet(), sourceTopicsByName.keySet());
+ throw new IllegalStateException("Tried to update source topics but source nodes
did not match");
+ }
+ sourceNodesByTopic.clear();
+ for (final Map.Entry<String, List<String>> sourceEntry : sourceTopicsByName.entrySet())
{
+ final String nodeName = sourceEntry.getKey();
+ for (final String topic : sourceEntry.getValue()) {
+ if (sourceNodesByTopic.containsKey(topic)) {
+ throw new IllegalStateException("Topic " + topic + " was already registered
to source node "
+ + sourceNodesByTopic.get(topic).name());
+ }
+ sourceNodesByTopic.put(topic, sourceNodesByName.get(nodeName));
+ }
+ }
+ }
+
private String childrenToString(final String indent, final List<ProcessorNode<?,
?>> children) {
if (children == null || children.isEmpty()) {
return "";
@@ -167,12 +199,36 @@ public class ProcessorTopology {
* @return A string representation of this instance.
*/
public String toString(final String indent) {
+ final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new
HashMap<>();
+ for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourceNodesByTopic.entrySet())
{
+ final String topic = sourceNodeEntry.getKey();
+ final SourceNode<?, ?> source = sourceNodeEntry.getValue();
+ sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
+ sourceToTopics.get(source).add(topic);
+ }
+
final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
// start from sources
- for (final SourceNode<?, ?> source : sourcesByTopic.values()) {
- sb.append(source.toString(indent + "\t")).append(childrenToString(indent + "\t",
source.children()));
+ for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry
: sourceToTopics.entrySet()) {
+ final SourceNode<?, ?> source = sourceNodeEntry.getKey();
+ final List<String> topics = sourceNodeEntry.getValue();
+ sb.append(source.toString(indent + "\t"))
+ .append(topicsToString(indent + "\t", topics))
+ .append(childrenToString(indent + "\t", source.children()));
+ }
+ return sb.toString();
+ }
+
+ private static String topicsToString(final String indent, final List<String> topics)
{
+ final StringBuilder sb = new StringBuilder();
+ sb.append(indent).append("\ttopics:\t\t[");
+ for (final String topic : topics) {
+ sb.append(topic);
+ sb.append(", ");
}
+ sb.setLength(sb.length() - 2); // remove the last comma
+ sb.append("]\n");
return sb.toString();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 717495e..39b8c0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -23,12 +23,8 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
-import java.util.List;
-
public class SourceNode<K, V> extends ProcessorNode<K, V> {
- private final List<String> topics;
-
private InternalProcessorContext context;
private Deserializer<K> keyDeserializer;
private Deserializer<V> valDeserializer;
@@ -36,22 +32,19 @@ public class SourceNode<K, V> extends ProcessorNode<K, V>
{
private Sensor processAtSourceSensor;
public SourceNode(final String name,
- final List<String> topics,
final TimestampExtractor timestampExtractor,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valDeserializer) {
super(name);
- this.topics = topics;
this.timestampExtractor = timestampExtractor;
this.keyDeserializer = keyDeserializer;
this.valDeserializer = valDeserializer;
}
public SourceNode(final String name,
- final List<String> topics,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valDeserializer) {
- this(name, topics, null, keyDeserializer, valDeserializer);
+ this(name, null, keyDeserializer, valDeserializer);
}
K deserializeKey(final String topic, final Headers headers, final byte[] data) {
@@ -109,21 +102,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V>
{
return toString("");
}
- /**
- * @return a string representation of this node starting with the given indent, useful
for debugging.
- */
- public String toString(final String indent) {
- final StringBuilder sb = new StringBuilder(super.toString(indent));
- sb.append(indent).append("\ttopics:\t\t[");
- for (final String topic : topics) {
- sb.append(topic);
- sb.append(", ");
- }
- sb.setLength(sb.length() - 2); // remove the last comma
- sb.append("]\n");
- return sb.toString();
- }
-
public TimestampExtractor getTimestampExtractor() {
return timestampExtractor;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b23a1a7..955adab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -496,12 +497,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
@Override
- public void update(final Set<TopicPartition> topicPartitions, final ProcessorTopology
processorTopology) {
- super.update(topicPartitions, processorTopology);
+ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
List<String>> nodeToSourceTopics) {
+ super.update(topicPartitions, nodeToSourceTopics);
partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
- if (state() != State.RESTORING) { // if task is RESTORING then topology will be initialized
in completeRestoration
- initializeTopology();
- }
}
@Override
@@ -512,18 +510,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
switch (state()) {
case CREATED:
- case RUNNING:
case RESTORING:
+ case RUNNING:
case SUSPENDED:
stateMgr.recycle();
recordCollector.close();
break;
-
case CLOSED:
- throw new IllegalStateException("Illegal state " + state() + " while closing
active task " + id);
-
+ throw new IllegalStateException("Illegal state " + state() + " while recycling
active task " + id);
default:
- throw new IllegalStateException("Unknown state " + state() + " while closing
active task " + id);
+ throw new IllegalStateException("Unknown state " + state() + " while recycling
active task " + id);
}
partitionGroup.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 9283e86..1318816 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -167,7 +168,7 @@ public interface Task {
/**
* Updates input partitions and topology after rebalance
*/
- void update(final Set<TopicPartition> topicPartitions, final ProcessorTopology
processorTopology);
+ void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>>
nodeToSourceTopics);
/**
* Attempt a clean close but do not close the underlying state
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 12361d7..c4efacd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -357,7 +357,7 @@ public class TaskManager {
for (final TopicPartition topicPartition : topicPartitions) {
partitionToTask.put(topicPartition, task);
}
- task.update(topicPartitions, builder.buildSubtopology(task.id().topicGroupId));
+ task.update(topicPartitions, builder.nodeToSourceTopics());
}
task.resume();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 0895fa6..2319199 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -56,11 +56,9 @@ public class GlobalStateTaskTest {
private final TopicPartition t1 = new TopicPartition(topic1, 1);
private final TopicPartition t2 = new TopicPartition(topic2, 1);
private final MockSourceNode<String, String> sourceOne = new MockSourceNode<>(
- new String[]{topic1},
new StringDeserializer(),
new StringDeserializer());
private final MockSourceNode<Integer, Integer> sourceTwo = new MockSourceNode<>(
- new String[]{topic2},
new IntegerDeserializer(),
new IntegerDeserializer());
private final MockProcessorNode<?, ?> processorOne = new MockProcessorNode<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 7e7f1b9..e358ef2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -100,7 +100,7 @@ public class PartitionGroupTest {
private RecordQueue createQueue1() {
return new RecordQueue(
partition1,
- new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+ new MockSourceNode<>(intDeserializer, intDeserializer),
timestampExtractor,
new LogAndContinueExceptionHandler(),
new InternalMockProcessorContext(),
@@ -111,7 +111,7 @@ public class PartitionGroupTest {
private RecordQueue createQueue2() {
return new RecordQueue(
partition2,
- new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+ new MockSourceNode<>(intDeserializer, intDeserializer),
timestampExtractor,
new LogAndContinueExceptionHandler(),
new InternalMockProcessorContext(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index baa59f3..9f852cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -58,6 +58,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
+import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -88,7 +89,6 @@ public class ProcessorTopologyTest {
private TopologyTestDriver driver;
private final Properties props = new Properties();
-
@Before
public void setup() {
// Create a new directory in which we'll put all of the state for this test, enabling
running tests in parallel ...
@@ -150,6 +150,29 @@ public class ProcessorTopologyTest {
}
@Test
+ public void shouldUpdateSourceTopicsWithNewMatchingTopic() {
+ topology.addSource("source-1", "topic-1");
+ final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+
+ assertNull(processorTopology.source("topic-2"));
+ processorTopology.updateSourceTopics(Collections.singletonMap("source-1", asList("topic-1",
"topic-2")));
+
+ assertThat(processorTopology.source("topic-2").name(), equalTo("source-1"));
+ }
+
+ @Test
+ public void shouldUpdateSourceTopicsWithRemovedTopic() {
+ topology.addSource("source-1", "topic-1", "topic-2");
+ final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+
+ assertThat(processorTopology.source("topic-2").name(), equalTo("source-1"));
+
+ processorTopology.updateSourceTopics(Collections.singletonMap("source-1", Collections.singletonList("topic-1")));
+
+ assertNull(processorTopology.source("topic-2"));
+ }
+
+ @Test
public void testDrivingSimpleTopology() {
final int partition = 10;
driver = new TopologyTestDriver(createSimpleTopology(partition), props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index f4b1c7f..7299067 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -26,8 +26,6 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.junit.Test;
-import java.util.Collections;
-
import static org.junit.Assert.assertEquals;
public class RecordDeserializerTest {
@@ -80,7 +78,7 @@ public class RecordDeserializerTest {
final boolean valueThrowsException,
final Object key,
final Object value) {
- super("", Collections.emptyList(), null, null);
+ super("", null, null);
this.keyThrowsException = keyThrowsException;
this.valueThrowsException = valueThrowsException;
this.key = key;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 01c1520..6929bb8e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -65,7 +65,7 @@ public class RecordQueueTest {
new MockRecordCollector()
);
private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics
- = new MockSourceNode<>(new String[] {"topic"}, intDeserializer, intDeserializer);
+ = new MockSourceNode<>(intDeserializer, intDeserializer);
private final RecordQueue queue = new RecordQueue(
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index 32ba4fb..028c4fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
-import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
@@ -44,7 +43,7 @@ import static org.junit.Assert.assertTrue;
public class SourceNodeTest {
@Test
public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
- final SourceNode<String, String> sourceNode = new MockSourceNode<>(new
String[]{""}, new TheDeserializer(), new TheDeserializer());
+ final SourceNode<String, String> sourceNode = new MockSourceNode<>(new
TheDeserializer(), new TheDeserializer());
final RecordHeaders headers = new RecordHeaders();
final String deserializeKey = sourceNode.deserializeKey("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
assertThat(deserializeKey, is("topic" + headers + "data"));
@@ -52,7 +51,7 @@ public class SourceNodeTest {
@Test
public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
- final SourceNode<String, String> sourceNode = new MockSourceNode<>(new
String[]{""}, new TheDeserializer(), new TheDeserializer());
+ final SourceNode<String, String> sourceNode = new MockSourceNode<>(new
TheDeserializer(), new TheDeserializer());
final RecordHeaders headers = new RecordHeaders();
final String deserializedValue = sourceNode.deserializeValue("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
assertThat(deserializedValue, is("topic" + headers + "data"));
@@ -85,7 +84,7 @@ public class SourceNodeTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client",
builtInMetricsVersion);
final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
final SourceNode<String, String> node =
- new SourceNode<>(context.currentNode().name(), Collections.singletonList("topic"),
new TheDeserializer(), new TheDeserializer());
+ new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new
TheDeserializer());
node.init(context);
final String threadId = Thread.currentThread().getName();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 27135f8..d77cbcb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.HashSet;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
@@ -116,9 +117,9 @@ public class StreamTaskTest {
private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
- private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(new
String[] {topic1}, intDeserializer, intDeserializer);
- private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(new
String[] {topic2}, intDeserializer, intDeserializer);
- private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer,
Integer>(new String[] {topic2}, intDeserializer, intDeserializer) {
+ private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer,
intDeserializer);
+ private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer,
intDeserializer);
+ private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer,
Integer>(intDeserializer, intDeserializer) {
@Override
public void process(final Integer key, final Integer value) {
throw new RuntimeException("KABOOM!");
@@ -1806,6 +1807,20 @@ public class StreamTaskTest {
EasyMock.replay(stateManager);
}
+ @Test
+ public void shouldUpdatePartitions() {
+ task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+ final Set<TopicPartition> newPartitions = new HashSet<>(task.inputPartitions());
+ newPartitions.add(new TopicPartition("newTopic", 0));
+
+ task.update(newPartitions, mkMap(
+ mkEntry(source1.name(), asList(topic1, "newTopic")),
+ mkEntry(source2.name(), singletonList(topic2)))
+ );
+
+ assertThat(task.inputPartitions(), equalTo(newPartitions));
+ }
+
private StreamTask createOptimizedStatefulTask(final StreamsConfig config, final Consumer<byte[],
byte[]> consumer) {
final StateStore stateStore = new MockKeyValueStore(storeName, true);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 200d841..0354e4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2623,7 +2623,6 @@ public class TaskManagerTest {
private Map<TopicPartition, OffsetAndMetadata> committableOffsets = Collections.emptyMap();
private Map<TopicPartition, Long> purgeableOffsets;
private Map<TopicPartition, Long> changelogOffsets = Collections.emptyMap();
- private InternalProcessorContext processorContext = mock(InternalProcessorContext.class);
private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>>
queue = new HashMap<>();
@@ -2723,6 +2722,11 @@ public class TaskManagerTest {
transitionTo(State.CLOSED);
}
+ @Override
+ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
List<String>> nodeToSourceTopics) {
+ inputPartitions = topicPartitions;
+ }
+
void setCommittableOffsetsAndMetadata(final Map<TopicPartition, OffsetAndMetadata>
committableOffsets) {
if (!active) {
throw new IllegalStateException("Cannot set CommittableOffsetsAndMetadate
for StandbyTasks");
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 89b00c4..f582202 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -16,13 +16,11 @@
*/
package org.apache.kafka.test;
-
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.SourceNode;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
public class MockSourceNode<K, V> extends SourceNode<K, V> {
@@ -36,8 +34,8 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> {
public boolean initialized;
public boolean closed;
- public MockSourceNode(final String[] topics, final Deserializer<K> keyDeserializer,
final Deserializer<V> valDeserializer) {
- super(NAME + INDEX.getAndIncrement(), Arrays.asList(topics), keyDeserializer, valDeserializer);
+ public MockSourceNode(final Deserializer<K> keyDeserializer, final Deserializer<V>
valDeserializer) {
+ super(NAME + INDEX.getAndIncrement(), keyDeserializer, valDeserializer);
}
@Override
|