Repository: kafka
Updated Branches:
refs/heads/trunk 46666a1f6 -> 17cb4fe52
HOTFIX: follow up on KAFKA-4275
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes #2039 from mjsax/hotfix-ktableLeftJoin
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/17cb4fe5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/17cb4fe5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/17cb4fe5
Branch: refs/heads/trunk
Commit: 17cb4fe52f74fcc1b7c43baa4649e0a4aba80fa3
Parents: 46666a1
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Oct 19 20:58:18 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Oct 19 20:58:18 2016 -0700
----------------------------------------------------------------------
.../streams/kstream/internals/KTableImpl.java | 14 +-
.../KTableKTableJoinIntegrationTest.java | 280 +++++++++++++++++++
2 files changed, 287 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/17cb4fe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index fc1c076..6423cff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -22,10 +22,10 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -305,8 +305,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
- topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+ topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+ topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes,
null);
}
@@ -333,8 +333,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
- topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+ topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+ topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes,
null);
}
@@ -360,8 +360,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
- topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+ topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+ topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes,
null);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/17cb4fe5/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
new file mode 100644
index 0000000..85e2cf7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may
obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+@RunWith(Parameterized.class)
+public class KTableKTableJoinIntegrationTest {
+ private final static int NUM_BROKERS = 1;
+
+ @ClassRule
+ public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+ private final static MockTime MOCK_TIME = CLUSTER.time;
+ private final static String TABLE_1 = "table1";
+ private final static String TABLE_2 = "table2";
+ private final static String TABLE_3 = "table3";
+ private final static String OUTPUT = "output-";
+ private static Properties streamsConfig;
+ private KafkaStreams streams;
+ private final static Properties CONSUMER_CONFIG = new Properties();
+
+ @Parameterized.Parameter(value = 0)
+ public JoinType joinType1;
+ @Parameterized.Parameter(value = 1)
+ public JoinType joinType2;
+ @Parameterized.Parameter(value = 2)
+ public List<KeyValue<String, String>> expectedResult;
+
+ //Single parameter, use Object[]
+ @Parameterized.Parameters
+ public static Object[] parameters() {
+ return new Object[][]{
+ {JoinType.INNER, JoinType.INNER, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", null))
+ },
+ {JoinType.INNER, JoinType.LEFT, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", null)
+ )},
+ {JoinType.INNER, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C3")
+ )},
+ {JoinType.LEFT, JoinType.INNER, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", null)
+ )},
+ {JoinType.LEFT, JoinType.LEFT, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", null)
+ )},
+ {JoinType.LEFT, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C3")
+ )},
+ {JoinType.OUTER, JoinType.INNER, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")
+ )},
+ {JoinType.OUTER, JoinType.LEFT, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")
+ )},
+ {JoinType.OUTER, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")
+ )}
+ };
+ }
+
+ public static Object[] data() {
+ return new Object[]{0, 10 * 1024 * 1024L};
+ }
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ CLUSTER.createTopic(TABLE_1);
+ CLUSTER.createTopic(TABLE_2);
+ CLUSTER.createTopic(TABLE_3);
+ CLUSTER.createTopic(OUTPUT);
+
+ streamsConfig = new Properties();
+ streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfig.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+ streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+ streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+
+ final Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ final List<KeyValue<String, String>> table1 = Arrays.asList(
+ new KeyValue<>("a", "A1"),
+ new KeyValue<>("b", "B1")
+ );
+
+ final List<KeyValue<String, String>> table2 = Arrays.asList(
+ new KeyValue<>("b", "B2"),
+ new KeyValue<>("c", "C2")
+ );
+
+ final List<KeyValue<String, String>> table3 = Arrays.asList(
+ new KeyValue<>("a", "A3"),
+ new KeyValue<>("b", "B3"),
+ new KeyValue<>("c", "C3")
+ );
+
+ // put table 3 first, to make sure data is there when joining T1 with T2
+ IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, producerConfig,
MOCK_TIME);
+ IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, producerConfig,
MOCK_TIME);
+ IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, producerConfig,
MOCK_TIME);
+
+
+ CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
+ CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ }
+
+ @Before
+ public void before() throws Exception {
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+ }
+
+ @After
+ public void after() throws Exception {
+ if (streams != null) {
+ streams.close();
+ streams = null;
+ }
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+ }
+
+ private enum JoinType {
+ INNER, LEFT, OUTER
+ }
+
+ private KafkaStreams prepareTopology() {
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
+ final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
+ final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
+
+ join(join(table1, table2, joinType1), table3, joinType2).to(OUTPUT);
+
+ return new KafkaStreams(builder, new StreamsConfig(streamsConfig));
+ }
+
+ private KTable<String, String> join(KTable<String, String> first, KTable<String,
String> second, JoinType joinType) {
+ final ValueJoiner<String, String, String> joiner = new ValueJoiner<String,
String, String>() {
+ @Override
+ public String apply(final String value1, final String value2) {
+ return value1 + "-" + value2;
+ }
+ };
+
+ switch (joinType) {
+ case INNER:
+ return first.join(second, joiner);
+ case LEFT:
+ return first.leftJoin(second, joiner);
+ case OUTER:
+ return first.outerJoin(second, joiner);
+ }
+
+ throw new RuntimeException("Unknown join type.");
+ }
+
+ @Test
+ public void KTableKTableJoin() throws Exception {
+ System.out.println("join: " + joinType1 + "-" + joinType2);
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2
+ "-ktable-ktable-join");
+
+ streams = prepareTopology();
+ streams.start();
+
+
+ final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ CONSUMER_CONFIG,
+ OUTPUT,
+ expectedResult.size());
+
+ assertThat(result, equalTo(expectedResult));
+ }
+
+}
|