kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ableegold...@apache.org
Subject [kafka] branch trunk updated: MINOR: Increase unit test coverage of method ProcessorTopology#updateSourceTopics() (#9654)
Date Tue, 01 Dec 2020 19:00:27 GMT
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9211ff6  MINOR: Increase unit test coverage of method ProcessorTopology#updateSourceTopics()
(#9654)
9211ff6 is described below

commit 9211ff6ffd06485a4c3bc41185be22bac0d961d8
Author: Bruno Cadonna <bruno@confluent.io>
AuthorDate: Tue Dec 1 19:58:57 2020 +0100

    MINOR: Increase unit test coverage of method ProcessorTopology#updateSourceTopics() (#9654)
    
    The unit tests for method ProcessorTopology#updateSourceTopics() did not cover all
    code paths.
    
    Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
---
 .../processor/internals/ProcessorTopology.java     |  1 -
 .../processor/internals/ProcessorTopologyTest.java | 93 +++++++++++++++++++---
 2 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index c4821c2..0a0118a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -172,7 +172,6 @@ public class ProcessorTopology {
                 }
                 sourceNodesByTopic.put(topic, sourceNode);
             }
-
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 8010dcf..d76ebaa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -53,6 +53,7 @@ import org.junit.Test;
 import java.io.File;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Properties;
 import java.util.Set;
@@ -64,11 +65,15 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.startsWith;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class ProcessorTopologyTest {
@@ -151,42 +156,106 @@ public class ProcessorTopologyTest {
 
     @Test
     public void shouldUpdateSourceTopicsWithNewMatchingTopic() {
-        topology.addSource("source-1", "topic-1");
+        final String sourceNode = "source-1";
+        final String topic = "topic-1";
+        final String newTopic = "topic-2";
+        topology.addSource(sourceNode, topic);
         final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+        assertThat(processorTopology.source(newTopic), is(nullValue()));
 
-        assertNull(processorTopology.source("topic-2"));
-        processorTopology.updateSourceTopics(Collections.singletonMap("source-1", asList("topic-1",
"topic-2")));
+        processorTopology.updateSourceTopics(Collections.singletonMap(sourceNode, asList(topic,
newTopic)));
 
-        assertThat(processorTopology.source("topic-2").name(), equalTo("source-1"));
+        assertThat(processorTopology.source(newTopic).name(), equalTo(sourceNode));
     }
 
     @Test
     public void shouldUpdateSourceTopicsWithRemovedTopic() {
-        topology.addSource("source-1", "topic-1", "topic-2");
+        final String sourceNode = "source-1";
+        final String topic = "topic-1";
+        final String topicToRemove = "topic-2";
+        topology.addSource(sourceNode, topic, topicToRemove);
         final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+        assertThat(processorTopology.source(topicToRemove).name(), equalTo(sourceNode));
+
+        processorTopology.updateSourceTopics(Collections.singletonMap(sourceNode, Collections.singletonList(topic)));
 
-        assertThat(processorTopology.source("topic-2").name(), equalTo("source-1"));
+        assertThat(processorTopology.source(topicToRemove), is(nullValue()));
+    }
+
+    @Test
+    public void shouldUpdateSourceTopicsWithAllTopicsRemoved() {
+        final String sourceNode = "source-1";
+        final String topic = "topic-1";
+        topology.addSource(sourceNode, topic);
+        final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+        assertThat(processorTopology.source(topic).name(), equalTo(sourceNode));
 
-        processorTopology.updateSourceTopics(Collections.singletonMap("source-1", Collections.singletonList("topic-1")));
+        processorTopology.updateSourceTopics(Collections.singletonMap(sourceNode, Collections.emptyList()));
 
-        assertNull(processorTopology.source("topic-2"));
+        assertThat(processorTopology.source(topic), is(nullValue()));
     }
 
     @Test
     public void shouldUpdateSourceTopicsOnlyForSourceNodesWithinTheSubtopology() {
-        topology.addSource("source-1", "topic-1");
+        final String sourceNodeWithinSubtopology = "source-1";
+        final String sourceNodeOutsideSubtopology = "source-2";
+        final String topicWithinSubtopology = "topic-1";
+        final String topicOutsideSubtopology = "topic-2";
+        topology.addSource(sourceNodeWithinSubtopology, topicWithinSubtopology);
         final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
 
         processorTopology.updateSourceTopics(mkMap(
-            mkEntry("source-1", Collections.singletonList("topic-1")),
-            mkEntry("source-2", Collections.singletonList("topic-2")))
+            mkEntry(sourceNodeWithinSubtopology, Collections.singletonList(topicWithinSubtopology)),
+            mkEntry(sourceNodeOutsideSubtopology, Collections.singletonList(topicOutsideSubtopology))
+            )
         );
 
-        assertNull(processorTopology.source("topic-2"));
+        assertThat(processorTopology.source(topicOutsideSubtopology), is(nullValue()));
         assertThat(processorTopology.sources().size(), equalTo(1));
     }
 
     @Test
+    public void shouldThrowIfSourceNodeToUpdateDoesNotExist() {
+        final String existingSourceNode = "source-1";
+        final String nonExistingSourceNode = "source-2";
+        final String topicOfExistingSourceNode = "topic-1";
+        final String topicOfNonExistingSourceNode = "topic-2";
+        topology.addSource(nonExistingSourceNode, topicOfNonExistingSourceNode);
+        final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+
+        final Throwable exception = assertThrows(
+            IllegalStateException.class,
+            () -> processorTopology.updateSourceTopics(Collections.singletonMap(
+                existingSourceNode, Collections.singletonList(topicOfExistingSourceNode)
+            ))
+        );
+        assertThat(exception.getMessage(), is("Node " + nonExistingSourceNode + " not found
in full topology"));
+    }
+
+    @Test
+    public void shouldThrowIfMultipleSourceNodeOfSameSubtopologySubscribedToSameTopic() {
+        final String sourceNode = "source-1";
+        final String updatedSourceNode = "source-2";
+        final String doublySubscribedTopic = "topic-1";
+        final String topic = "topic-2";
+        topology.addSource(sourceNode, doublySubscribedTopic);
+        topology.addSource(updatedSourceNode, topic);
+        final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+
+        final Throwable exception = assertThrows(
+            IllegalStateException.class,
+            () -> processorTopology.updateSourceTopics(mkMap(
+                mkEntry(sourceNode, Collections.singletonList(doublySubscribedTopic)),
+                mkEntry(updatedSourceNode, Arrays.asList(topic, doublySubscribedTopic))
+            ))
+        );
+        assertThat(
+            exception.getMessage(),
+            startsWith("Topic " + doublySubscribedTopic + " was already registered to source
node")
+        );
+    }
+
+    @Test
     public void testDrivingSimpleTopology() {
         final int partition = 10;
         driver = new TopologyTestDriver(createSimpleTopology(partition), props);


Mime
View raw message