kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Add regression tests for KTable mapValues and filter (#5134)
Date Tue, 05 Jun 2018 18:13:28 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba5fd3c  MINOR: Add regression tests for KTable mapValues and filter (#5134)
ba5fd3c is described below

commit ba5fd3c8a40836dd91389e566253affb0677776f
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Tue Jun 5 13:12:57 2018 -0500

    MINOR: Add regression tests for KTable mapValues and filter (#5134)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/TopologyTest.java     | 145 ++++++++++++++++++++-
 1 file changed, 144 insertions(+), 1 deletion(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index f8f9c7d..8b47885 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
@@ -27,12 +29,14 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStore;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -101,7 +105,7 @@ public class TopologyTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTopicChooserWhenAddingSink() {
-        topology.addSink("name", (TopicNameExtractor) null);
+        topology.addSink("name", (TopicNameExtractor<Object, Object>) null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -859,6 +863,145 @@ public class TopologyTest {
         );
     }
 
+    @Test
+    public void kTableNonMaterializedMapValuesShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Object, Object> table = builder.table("input-topic");
+        table.mapValues((readOnlyKey, value) -> null);
+        final TopologyDescription describe = builder.build().describe();
+        System.out.println(describe);
+        Assert.assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n"
+
+                "      --> KTABLE-MAPVALUES-0000000003\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KTABLE-MAPVALUES-0000000003 (stores: [])\n" +
+                "      --> none\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n\n",
+            describe.toString());
+    }
+
+    @Test
+    public void kTableAnonymousMaterializedMapValuesShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Object, Object> table = builder.table("input-topic");
+        table.mapValues(
+            (readOnlyKey, value) -> null,
+            Materialized.with(null, null));
+        final TopologyDescription describe = builder.build().describe();
+        Assert.assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n"
+
+                "      --> KTABLE-MAPVALUES-0000000004\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                // previously, this was
+                //   Processor: KTABLE-MAPVALUES-0000000004 (stores: [KTABLE-MAPVALUES-STATE-STORE-0000000003]
+                // but we added a change not to materialize non-queriable stores. This change
shouldn't break compatibility.
+                "    Processor: KTABLE-MAPVALUES-0000000004 (stores: [])\n" +
+                "      --> none\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n" +
+                "\n",
+            describe.toString());
+    }
+
+    @Test
+    public void kTableNamedMaterializedMapValuesShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Object, Object> table = builder.table("input-topic");
+        table.mapValues(
+            (readOnlyKey, value) -> null,
+            Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as("store-name").withKeySerde(null).withValueSerde(null));
+        final TopologyDescription describe = builder.build().describe();
+        Assert.assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n"
+
+                "      --> KTABLE-MAPVALUES-0000000003\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KTABLE-MAPVALUES-0000000003 (stores: [store-name])\n" +
+                "      --> none\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n" +
+                "\n",
+            describe.toString());
+    }
+
+    @Test
+    public void kTableNonMaterializedFilterShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Object, Object> table = builder.table("input-topic");
+        table.filter((key, value) -> false);
+        final TopologyDescription describe = builder.build().describe();
+        Assert.assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n"
+
+                "      --> KTABLE-FILTER-0000000003\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KTABLE-FILTER-0000000003 (stores: [])\n" +
+                "      --> none\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n\n",
+            describe.toString());
+    }
+
+    @Test
+    public void kTableAnonymousMaterializedFilterShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Object, Object> table = builder.table("input-topic");
+        table.filter(
+            (key, value) -> false,
+            Materialized.with(null, null));
+        final TopologyDescription describe = builder.build().describe();
+        Assert.assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n"
+
+                "      --> KTABLE-FILTER-0000000004\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                // Previously, this was
+                //   Processor: KTABLE-FILTER-0000000004 (stores: [KTABLE-FILTER-STATE-STORE-0000000003]
+                // but we added a change not to materialize non-queriable stores. This change
shouldn't break compatibility.
+                "    Processor: KTABLE-FILTER-0000000004 (stores: [])\n" +
+                "      --> none\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n" +
+                "\n",
+            describe.toString());
+    }
+
+    @Test
+    public void kTableNamedMaterializedFilterShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Object, Object> table = builder.table("input-topic");
+        table.filter(
+            (key, value) -> false,
+            Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as("store-name").withKeySerde(null).withValueSerde(null));
+        final TopologyDescription describe = builder.build().describe();
+        Assert.assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n"
+
+                "      --> KTABLE-FILTER-0000000003\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KTABLE-FILTER-0000000003 (stores: [store-name])\n" +
+                "      --> none\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n" +
+                "\n",
+            describe.toString());
+    }
+
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final String... sourceTopic) {
         topology.addSource(null, sourceName, null, null, null, sourceTopic);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message