kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4662: adding test coverage for addSource methods with AutoOffsetReset
Date Fri, 03 Feb 2017 16:50:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ab4ebb4bb -> da082900b


KAFKA-4662: adding test coverage for addSource methods with AutoOffsetReset

…tReset

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang

Closes #2464 from bbejeck/KAFKA-4662_improve_topology_builder_test_coverage


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da082900
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da082900
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da082900

Branch: refs/heads/trunk
Commit: da082900b3d800ba43616706def611df7a00308f
Parents: ab4ebb4
Author: Bill Bejeck <bbejeck@gmail.com>
Authored: Fri Feb 3 08:50:20 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Feb 3 08:50:20 2017 -0800

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilderTest.java  | 88 ++++++++++++++++++++
 1 file changed, 88 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/da082900/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 2f3a450..23d6704 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
@@ -50,9 +52,95 @@ import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TopologyBuilderTest {
 
+
+    @Test
+    public void shouldAddSourceWithOffsetReset() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        final String earliestTopic = "earliestTopic";
+        final String latestTopic = "latestTopic";
+
+        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", earliestTopic);
+        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", latestTopic);
+
+        assertTrue(builder.earliestResetTopicsPattern().matcher(earliestTopic).matches());
+        assertTrue(builder.latestResetTopicsPattern().matcher(latestTopic).matches());
+
+    }
+
+    @Test
+    public void shouldAddSourcePatternWithOffsetReset() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        
+        final String earliestTopicPattern = "earliest.*Topic";
+        final String latestTopicPattern = "latest.*Topic";
+
+        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", Pattern.compile(earliestTopicPattern));
+        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", Pattern.compile(latestTopicPattern));
+
+        assertTrue(builder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches());
+        assertTrue(builder.latestResetTopicsPattern().matcher("latestTestTopic").matches());
+    }
+
+    @Test
+    public void shouldAddSourceWithoutOffsetReset() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        final Serde<String> stringSerde = Serdes.String();
+        final Pattern expectedPattern = Pattern.compile("test-topic");
+
+        builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(),
"test-topic");
+
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+        assertEquals(builder.earliestResetTopicsPattern().pattern(), "");
+        assertEquals(builder.latestResetTopicsPattern().pattern(), "");
+    }
+
+    @Test
+    public void shouldAddPatternSourceWithoutOffsetReset() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        final Serde<String> stringSerde = Serdes.String();
+        final Pattern expectedPattern = Pattern.compile("test-.*");
+        
+        builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(),
Pattern.compile("test-.*"));
+
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+        assertEquals(builder.earliestResetTopicsPattern().pattern(), "");
+        assertEquals(builder.latestResetTopicsPattern().pattern(), "");
+    }
+
+    @Test
+    public void shouldNotAllowOffsetResetSourceWithoutTopics() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        final Serde<String> stringSerde = Serdes.String();
+
+        try {
+            builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", stringSerde.deserializer(),
stringSerde.deserializer(), new String[]{});
+            fail("Should throw TopologyBuilderException with no topics");
+        } catch (TopologyBuilderException tpe) {
+            //no-op
+        }
+    }
+
+    @Test
+    public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        final Serde<String> stringSerde = Serdes.String();
+
+        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", stringSerde.deserializer(),
stringSerde.deserializer(), "topic-1");
+        try {
+            builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", stringSerde.deserializer(),
stringSerde.deserializer(), "topic-2");
+            fail("Should throw TopologyBuilderException for duplicate source name");
+        } catch (TopologyBuilderException tpe) {
+            //no-op
+        }
+    }
+
+
+    
     @Test(expected = TopologyBuilderException.class)
     public void testAddSourceWithSameName() {
         final TopologyBuilder builder = new TopologyBuilder();


Mime
View raw message