kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8240: Fix NPE in Source.equals() (#6685)
Date Thu, 09 May 2019 13:48:23 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 17708df  KAFKA-8240: Fix NPE in Source.equals() (#6685)
17708df is described below

commit 17708df8c6f23794183b0106a622d3657a9e7bd6
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Thu May 9 15:48:11 2019 +0200

    KAFKA-8240: Fix NPE in Source.equals() (#6685)
    
    - backport of PR #6589 to 2.2 branch
    
    Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 .../internals/InternalTopologyBuilder.java         |  25 +++-
 .../internals/InternalTopologyBuilderTest.java     | 144 ++++++++++++++++-----
 2 files changed, 132 insertions(+), 37 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 0648fec..5c7203d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -280,7 +280,7 @@ public class InternalTopologyBuilder {
 
         @Override
         Source describe() {
-            return new Source(name, new HashSet<>(topics), pattern);
+            return new Source(name, topics.size() == 0 ? null : new HashSet<>(topics),
pattern);
         }
     }
 
@@ -1310,6 +1310,9 @@ public class InternalTopologyBuilder {
         @Override
         public int compare(final TopologyDescription.Node node1,
                            final TopologyDescription.Node node2) {
+            if (node1.equals(node2)) {
+                return 0;
+            }
             final int size1 = ((AbstractNode) node1).size;
             final int size2 = ((AbstractNode) node2).size;
 
@@ -1428,6 +1431,7 @@ public class InternalTopologyBuilder {
         int size;
 
         AbstractNode(final String name) {
+            Objects.requireNonNull(name, "name cannot be null");
             this.name = name;
             this.size = 1;
         }
@@ -1464,6 +1468,13 @@ public class InternalTopologyBuilder {
                       final Set<String> topics,
                       final Pattern pattern) {
             super(name);
+            if (topics == null && pattern == null) {
+                throw new IllegalArgumentException("Either topics or pattern must be not-null,
but both are null.");
+            }
+            if (topics != null && pattern != null) {
+                throw new IllegalArgumentException("Either topics or pattern must be null,
but both are not null.");
+            }
+
             this.topics = topics;
             this.topicPattern = pattern;
         }
@@ -1508,8 +1519,10 @@ public class InternalTopologyBuilder {
             final Source source = (Source) o;
             // omit successor to avoid infinite loops
             return name.equals(source.name)
-                && topics.equals(source.topics)
-                && topicPattern.equals(source.topicPattern);
+                && (topics == null && source.topics == null
+                    || topics != null && topics.equals(source.topics))
+                && (topicPattern == null && source.topicPattern == null
+                    || topicPattern != null && topicPattern.pattern().equals(source.topicPattern.pattern()));
         }
 
         @Override
@@ -1738,6 +1751,9 @@ public class InternalTopologyBuilder {
         @Override
         public int compare(final TopologyDescription.GlobalStore globalStore1,
                            final TopologyDescription.GlobalStore globalStore2) {
+            if (globalStore1.equals(globalStore2)) {
+                return 0;
+            }
             return globalStore1.id() - globalStore2.id();
         }
     }
@@ -1748,6 +1764,9 @@ public class InternalTopologyBuilder {
         @Override
         public int compare(final TopologyDescription.Subtopology subtopology1,
                            final TopologyDescription.Subtopology subtopology2) {
+            if (subtopology1.equals(subtopology2)) {
+                return 0;
+            }
             return subtopology1.id() - subtopology2.id();
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 1be5231..552934f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -23,15 +23,13 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.TopologyException;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
@@ -50,12 +48,13 @@ import java.util.regex.Pattern;
 import static java.time.Duration.ofSeconds;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -353,9 +352,9 @@ public class InternalTopologyBuilderTest {
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
 
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups
= new HashMap<>();
-        expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(),
mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(),
Collections.<String, InternalTopicConfig>emptyMap()));
-        expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(),
mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String,
InternalTopicConfig>emptyMap()));
-        expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(),
mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String,
InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(),
mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap()));
+        expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(),
mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap()));
+        expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(),
mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap()));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -393,17 +392,17 @@ public class InternalTopologyBuilderTest {
         final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
         final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
         expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(
-            Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
-            Collections.<String, InternalTopicConfig>emptyMap(),
-            Collections.singletonMap(store1, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store1,
Collections.<String, String>emptyMap()))));
+            Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
+            Collections.emptyMap(),
+            Collections.singletonMap(store1, new UnwindowedChangelogTopicConfig(store1, Collections.emptyMap()))));
         expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(
-            Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
-            Collections.<String, InternalTopicConfig>emptyMap(),
-            Collections.singletonMap(store2, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store2,
Collections.<String, String>emptyMap()))));
+            Collections.emptySet(), mkSet("topic-3", "topic-4"),
+            Collections.emptyMap(),
+            Collections.singletonMap(store2, new UnwindowedChangelogTopicConfig(store2, Collections.emptyMap()))));
         expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(
-            Collections.<String>emptySet(), mkSet("topic-5"),
-            Collections.<String, InternalTopicConfig>emptyMap(),
-            Collections.singletonMap(store3, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store3,
Collections.<String, String>emptyMap()))));
+            Collections.emptySet(), mkSet("topic-5"),
+            Collections.emptyMap(),
+            Collections.singletonMap(store3, new UnwindowedChangelogTopicConfig(store3, Collections.emptyMap()))));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -499,12 +498,7 @@ public class InternalTopologyBuilderTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullNameWhenAddingProcessor() {
-        builder.addProcessor(null, new ProcessorSupplier() {
-            @Override
-            public Processor get() {
-                return null;
-            }
-        });
+        builder.addProcessor(null, () -> null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -604,14 +598,14 @@ public class InternalTopologyBuilderTest {
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
-        final Map<String, String> properties1 = topicConfig1.getProperties(Collections.<String,
String>emptyMap(), 10000);
+        final Map<String, String> properties1 = topicConfig1.getProperties(Collections.emptyMap(),
10000);
         assertEquals(2, properties1.size());
         assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE,
properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG));
         assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG));
         assertEquals("appId-store1-changelog", topicConfig1.name());
         assertTrue(topicConfig1 instanceof WindowedChangelogTopicConfig);
         final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog");
-        final Map<String, String> properties2 = topicConfig2.getProperties(Collections.<String,
String>emptyMap(), 10000);
+        final Map<String, String> properties2 = topicConfig2.getProperties(Collections.emptyMap(),
10000);
         assertEquals(2, properties2.size());
         assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE,
properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG));
         assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG));
@@ -628,7 +622,7 @@ public class InternalTopologyBuilderTest {
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
-        final Map<String, String> properties = topicConfig.getProperties(Collections.<String,
String>emptyMap(), 10000);
+        final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(),
10000);
         assertEquals(1, properties.size());
         assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
         assertEquals("appId-store-changelog", topicConfig.name());
@@ -642,7 +636,7 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source", null, null, null, "foo");
         final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
-        final Map<String, String> properties = topicConfig.getProperties(Collections.<String,
String>emptyMap(), 10000);
+        final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(),
10000);
         assertEquals(5, properties.size());
         assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG));
         assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
@@ -708,32 +702,32 @@ public class InternalTopologyBuilderTest {
 
         assertTrue(iterator.hasNext());
         InternalTopologyBuilder.AbstractNode node = (InternalTopologyBuilder.AbstractNode)
iterator.next();
-        assertTrue(node.name.equals("source1"));
+        assertEquals("source1", node.name);
         assertEquals(6, node.size);
 
         assertTrue(iterator.hasNext());
         node = (InternalTopologyBuilder.AbstractNode) iterator.next();
-        assertTrue(node.name.equals("source2"));
+        assertEquals("source2", node.name);
         assertEquals(4, node.size);
 
         assertTrue(iterator.hasNext());
         node = (InternalTopologyBuilder.AbstractNode) iterator.next();
-        assertTrue(node.name.equals("processor2"));
+        assertEquals("processor2", node.name);
         assertEquals(3, node.size);
 
         assertTrue(iterator.hasNext());
         node = (InternalTopologyBuilder.AbstractNode) iterator.next();
-        assertTrue(node.name.equals("processor1"));
+        assertEquals("processor1", node.name);
         assertEquals(2, node.size);
 
         assertTrue(iterator.hasNext());
         node = (InternalTopologyBuilder.AbstractNode) iterator.next();
-        assertTrue(node.name.equals("processor3"));
+        assertEquals("processor3", node.name);
         assertEquals(2, node.size);
 
         assertTrue(iterator.hasNext());
         node = (InternalTopologyBuilder.AbstractNode) iterator.next();
-        assertTrue(node.name.equals("sink1"));
+        assertEquals("sink1", node.name);
         assertEquals(1, node.size);
     }
 
@@ -760,7 +754,7 @@ public class InternalTopologyBuilderTest {
         final Map<String, List<String>> stateStoreAndTopics = builder.stateStoreNameToSourceTopics();
         final List<String> topics = stateStoreAndTopics.get(storeBuilder.name());
 
-        assertTrue("Expected to contain two topics", topics.size() == 2);
+        assertEquals("Expected to contain two topics", 2, topics.size());
 
         assertTrue(topics.contains("topic-2"));
         assertTrue(topics.contains("topic-3"));
@@ -781,4 +775,86 @@ public class InternalTopologyBuilderTest {
             sameNameForSourceAndProcessor,
             new MockProcessorSupplier());
     }
+
+    @Test
+    public void shouldThrowIfNameIsNull() {
+        try {
+            new InternalTopologyBuilder.Source(null, Collections.emptySet(), null);
+            fail("Should have thrown NullPointerException");
+        } catch (final NullPointerException expected) {
+            assertEquals("name cannot be null", expected.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldThrowIfTopicAndPatternAreNull() {
+        try {
+            new InternalTopologyBuilder.Source("name", null, null);
+            fail("Should have thrown IllegalArgumentException");
+        } catch (final IllegalArgumentException expected) {
+            assertEquals("Either topics or pattern must be not-null, but both are null.",
expected.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldThrowIfBothTopicAndPatternAreNotNull() {
+        try {
+            new InternalTopologyBuilder.Source("name", Collections.emptySet(), Pattern.compile(""));
+            fail("Should have thrown IllegalArgumentException");
+        } catch (final IllegalArgumentException expected) {
+            assertEquals("Either topics or pattern must be null, but both are not null.",
expected.getMessage());
+        }
+    }
+
+    @Test
+    public void sourceShouldBeEqualIfNameAndTopicListAreTheSame() {
+        final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name",
Collections.singleton("topic"), null);
+        final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name",
Collections.singleton("topic"), null);
+
+        assertThat(base, equalTo(sameAsBase));
+    }
+
+    @Test
+    public void sourceShouldBeEqualIfNameAndPatternAreTheSame() {
+        final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name",
null, Pattern.compile("topic"));
+        final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name",
null, Pattern.compile("topic"));
+
+        assertThat(base, equalTo(sameAsBase));
+    }
+
+    @Test
+    public void sourceShouldNotBeEqualForDifferentNamesWithSameTopicList() {
+        final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name",
Collections.singleton("topic"), null);
+        final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2",
Collections.singleton("topic"), null);
+
+        assertThat(base, not(equalTo(differentName)));
+    }
+
+    @Test
+    public void sourceShouldNotBeEqualForDifferentNamesWithSamePattern() {
+        final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name",
null, Pattern.compile("topic"));
+        final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2",
null, Pattern.compile("topic"));
+
+        assertThat(base, not(equalTo(differentName)));
+    }
+
+    @Test
+    public void sourceShouldNotBeEqualForDifferentTopicList() {
+        final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name",
Collections.singleton("topic"), null);
+        final InternalTopologyBuilder.Source differentTopicList = new InternalTopologyBuilder.Source("name",
Collections.emptySet(), null);
+        final InternalTopologyBuilder.Source differentTopic = new InternalTopologyBuilder.Source("name",
Collections.singleton("topic2"), null);
+
+        assertThat(base, not(equalTo(differentTopicList)));
+        assertThat(base, not(equalTo(differentTopic)));
+    }
+
+    @Test
+    public void sourceShouldNotBeEqualForDifferentPattern() {
+        final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name",
null, Pattern.compile("topic"));
+        final InternalTopologyBuilder.Source differentPattern = new InternalTopologyBuilder.Source("name",
null, Pattern.compile("topic2"));
+        final InternalTopologyBuilder.Source overlappingPattern = new InternalTopologyBuilder.Source("name",
null, Pattern.compile("top*"));
+
+        assertThat(base, not(equalTo(differentPattern)));
+        assertThat(base, not(equalTo(overlappingPattern)));
+    }
 }


Mime
View raw message