kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/5] kafka git commit: HOTFIX: follow up on KAFKA-4275
Date Wed, 23 Nov 2016 15:28:37 GMT
HOTFIX: follow up on KAFKA-4275

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2039 from mjsax/hotfix-ktableLeftJoin


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a24d3a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a24d3a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a24d3a2

Branch: refs/heads/0.10.1
Commit: 0a24d3a258a544314d8ea995157a351f9ca8994e
Parents: 4fdbc72
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Oct 19 20:58:18 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 23 07:18:15 2016 -0800

----------------------------------------------------------------------
 .../streams/kstream/internals/KTableImpl.java   |  14 +-
 .../KTableKTableJoinIntegrationTest.java        | 280 +++++++++++++++++++
 2 files changed, 287 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0a24d3a2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 7ce0bbb..06feb0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -297,8 +297,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
-        topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+        topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+        topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
 
         return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes,
null);
     }
@@ -325,8 +325,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
-        topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+        topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+        topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
 
         return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes,
null);
     }
@@ -352,8 +352,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
-        topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+        topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+        topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
 
         return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes,
null);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a24d3a2/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
new file mode 100644
index 0000000..85e2cf7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may
obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+@RunWith(Parameterized.class)
+public class KTableKTableJoinIntegrationTest {
+    private final static int NUM_BROKERS = 1;
+
+    @ClassRule
+    public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final static MockTime MOCK_TIME = CLUSTER.time;
+    private final static String TABLE_1 = "table1";
+    private final static String TABLE_2 = "table2";
+    private final static String TABLE_3 = "table3";
+    private final static String OUTPUT = "output-";
+    private static Properties streamsConfig;
+    private KafkaStreams streams;
+    private final static Properties CONSUMER_CONFIG = new Properties();
+
+    @Parameterized.Parameter(value = 0)
+    public JoinType joinType1;
+    @Parameterized.Parameter(value = 1)
+    public JoinType joinType2;
+    @Parameterized.Parameter(value = 2)
+    public List<KeyValue<String, String>> expectedResult;
+
+    //Single parameter, use Object[]
+    @Parameterized.Parameters
+    public static Object[] parameters() {
+        return new Object[][]{
+            {JoinType.INNER, JoinType.INNER, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", null))
+            },
+            {JoinType.INNER, JoinType.LEFT, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", null)
+            )},
+            {JoinType.INNER, JoinType.OUTER, Arrays.asList(
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("c", "null-C3"),
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C3")
+            )},
+            {JoinType.LEFT, JoinType.INNER, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", null)
+            )},
+            {JoinType.LEFT, JoinType.LEFT, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", null)
+            )},
+            {JoinType.LEFT, JoinType.OUTER, Arrays.asList(
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("c", "null-C3"),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C3")
+            )},
+            {JoinType.OUTER, JoinType.INNER, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C2-C3")
+            )},
+            {JoinType.OUTER, JoinType.LEFT, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C2-C3")
+            )},
+            {JoinType.OUTER, JoinType.OUTER, Arrays.asList(
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("c", "null-C3"),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C2-C3")
+            )}
+        };
+    }
+
+    public static Object[] data() {
+        return new Object[]{0, 10 * 1024 * 1024L};
+    }
+
+    @BeforeClass
+    public static void beforeTest() throws Exception {
+        CLUSTER.createTopic(TABLE_1);
+        CLUSTER.createTopic(TABLE_2);
+        CLUSTER.createTopic(TABLE_3);
+        CLUSTER.createTopic(OUTPUT);
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfig.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        final List<KeyValue<String, String>> table1 = Arrays.asList(
+            new KeyValue<>("a", "A1"),
+            new KeyValue<>("b", "B1")
+        );
+
+        final List<KeyValue<String, String>> table2 = Arrays.asList(
+            new KeyValue<>("b", "B2"),
+            new KeyValue<>("c", "C2")
+        );
+
+        final List<KeyValue<String, String>> table3 = Arrays.asList(
+            new KeyValue<>("a", "A3"),
+            new KeyValue<>("b", "B3"),
+            new KeyValue<>("c", "C3")
+        );
+
+        // put table 3 first, to make sure data is there when joining T1 with T2
+        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, producerConfig,
MOCK_TIME);
+        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, producerConfig,
MOCK_TIME);
+        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, producerConfig,
MOCK_TIME);
+
+
+        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
+        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    }
+
+    @Before
+    public void before() throws Exception {
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close();
+            streams = null;
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+    }
+
+    private enum JoinType {
+        INNER, LEFT, OUTER
+    }
+
+    private KafkaStreams prepareTopology() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
+        final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
+        final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
+
+        join(join(table1, table2, joinType1), table3, joinType2).to(OUTPUT);
+
+        return new KafkaStreams(builder, new StreamsConfig(streamsConfig));
+    }
+
+    private KTable<String, String> join(KTable<String, String> first, KTable<String,
String> second, JoinType joinType) {
+        final ValueJoiner<String, String, String> joiner = new ValueJoiner<String,
String, String>() {
+            @Override
+            public String apply(final String value1, final String value2) {
+                return value1 + "-" + value2;
+            }
+        };
+
+        switch (joinType) {
+            case INNER:
+                return first.join(second, joiner);
+            case LEFT:
+                return first.leftJoin(second, joiner);
+            case OUTER:
+                return first.outerJoin(second, joiner);
+        }
+
+        throw new RuntimeException("Unknown join type.");
+    }
+
+    @Test
+    public void KTableKTableJoin() throws Exception {
+        System.out.println("join: " + joinType1 + "-" + joinType2);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2
+ "-ktable-ktable-join");
+
+        streams = prepareTopology();
+        streams.start();
+
+
+        final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            CONSUMER_CONFIG,
+            OUTPUT,
+            expectedResult.size());
+
+        assertThat(result, equalTo(expectedResult));
+    }
+
+}


Mime
View raw message