kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: fix table-table outer join and left join. more tests
Date Thu, 10 Dec 2015 07:02:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ec466d358 -> 3b350cdff


HOTFIX: fix table-table outer join and left join. more tests

guozhangwang

* fixed bugs in table-table outer/left joins
* added more tests

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #653 from ymatsuda/join_tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b350cdf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b350cdf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b350cdf

Branch: refs/heads/trunk
Commit: 3b350cdff795ec08dc77e60f127f2790149d8d52
Parents: ec466d3
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Wed Dec 9 23:02:44 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Dec 9 23:02:44 2015 -0800

----------------------------------------------------------------------
 .../streams/kstream/internals/KTableImpl.java   |  28 ++-
 .../kstream/internals/KTableKTableJoin.java     |   9 +-
 .../internals/KTableKTableOuterJoin.java        | 112 +++++++++++
 .../internals/KTableKTableRightJoin.java        | 113 +++++++++++
 .../internals/KStreamKTableLeftJoinTest.java    | 171 +++++++++++++++++
 .../kstream/internals/KTableKTableJoinTest.java | 180 ++++++++++++++++++
 .../internals/KTableKTableLeftJoinTest.java     | 189 +++++++++++++++++++
 .../internals/KTableKTableOuterJoinTest.java    | 189 +++++++++++++++++++
 .../apache/kafka/test/KStreamTestDriver.java    |   4 +
 .../kafka/test/MockProcessorSupplier.java       |  13 ++
 10 files changed, 996 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3b350cdf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 308e4f5..9362b6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -55,7 +55,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
 
-    public static final String LEFTJOIN_NAME = "KTABLE-LEFTJOIN-";
+    public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
+    public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
 
     public static final String MERGE_NAME = "KTABLE-MERGE-";
 
@@ -193,7 +194,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
 
-        return new KTableImpl<>(topology, name, joinThis, allSourceNodes);
+        return new KTableImpl<>(topology, joinMergeName, joinThis, allSourceNodes);
     }
 
     @SuppressWarnings("unchecked")
@@ -205,8 +206,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         String joinOtherName = topology.newName(OUTEROTHER_NAME);
         String joinMergeName = topology.newName(MERGE_NAME);
 
-        KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
-        KTableKTableLeftJoin<K, R, V1, V> joinOther = new KTableKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+        KTableKTableOuterJoin<K, R, V, V1> joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+        KTableKTableOuterJoin<K, R, V1, V> joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
         KTableMerge<K, R> joinMerge = new KTableMerge<>(
                 new KTableImpl<K, V, R>(topology, joinThisName, null, this.sourceNodes),
                 new KTableImpl<K, V1, R>(topology, joinOtherName, null, ((KTableImpl<K, ?, ?>) other).sourceNodes)
@@ -216,7 +217,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
 
-        return new KTableImpl<>(topology, name, joinMerge, allSourceNodes);
+        return new KTableImpl<>(topology, joinMergeName, joinThis, allSourceNodes);
     }
 
     @SuppressWarnings("unchecked")
@@ -224,13 +225,22 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
-        String name = topology.newName(LEFTJOIN_NAME);
+        String joinThisName = topology.newName(LEFTTHIS_NAME);
+        String joinOtherName = topology.newName(LEFTOTHER_NAME);
+        String joinMergeName = topology.newName(MERGE_NAME);
 
-        KTableKTableLeftJoin<K, R, V, V1> leftJoin = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+        KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+        KTableKTableRightJoin<K, R, V1, V> joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+        KTableMerge<K, R> joinMerge = new KTableMerge<>(
+                new KTableImpl<K, V, R>(topology, joinThisName, null, this.sourceNodes),
+                new KTableImpl<K, V1, R>(topology, joinOtherName, null, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+        );
 
-        topology.addProcessor(name, leftJoin, this.name);
+        topology.addProcessor(joinThisName, joinThis, this.name);
+        topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
+        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
 
-        return new KTableImpl<>(topology, name, leftJoin, allSourceNodes);
+        return new KTableImpl<>(topology, joinMergeName, joinThis, allSourceNodes);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b350cdf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 058e75d..1eaa009 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -100,14 +100,17 @@ class KTableKTableJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V
 
         @Override
         public V get(K key) {
+            V newValue = null;
             V1 value1 = valueGetter1.get(key);
 
             if (value1 != null) {
                 V2 value2 = valueGetter2.get(key);
-                return joiner.apply(value1, value2);
-            } else {
-                return null;
+
+                if (value2 != null)
+                    newValue = joiner.apply(value1, value2);
             }
+
+            return newValue;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b350cdf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
new file mode 100644
index 0000000..8d54d02
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableKTableOuterJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
+
+    private final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+    private final ValueJoiner<V1, V2, V> joiner;
+
+    KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1,
+                          KTableImpl<K, ?, V2> table2,
+                          ValueJoiner<V1, V2, V> joiner) {
+        this.valueGetterSupplier1 = table1.valueGetterSupplier();
+        this.valueGetterSupplier2 = table2.valueGetterSupplier();
+        this.joiner = joiner;
+    }
+
+    @Override
+    public Processor<K, V1> get() {
+        return new KTableKTableOuterJoinProcessor(valueGetterSupplier2.get());
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        return new KTableValueGetterSupplier<K, V>() {
+
+            public KTableValueGetter<K, V> get() {
+                return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+            }
+
+        };
+    }
+
+    private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, V1> {
+
+        private final KTableValueGetter<K, V2> valueGetter;
+
+        public KTableKTableOuterJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
+            this.valueGetter = valueGetter;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            valueGetter.init(context);
+        }
+
+        @Override
+        public void process(K key, V1 value1) {
+            V newValue = null;
+            V2 value2 = valueGetter.get(key);
+
+            if (value1 != null || value2 != null)
+                newValue = joiner.apply(value1, value2);
+
+            context().forward(key, newValue);
+        }
+    }
+
+    private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, V> {
+
+        private final KTableValueGetter<K, V1> valueGetter1;
+        private final KTableValueGetter<K, V2> valueGetter2;
+
+        public KTableKTableOuterJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+            this.valueGetter1 = valueGetter1;
+            this.valueGetter2 = valueGetter2;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            valueGetter1.init(context);
+            valueGetter2.init(context);
+        }
+
+        @Override
+        public V get(K key) {
+            V newValue = null;
+            V1 value1 = valueGetter1.get(key);
+            V2 value2 = valueGetter2.get(key);
+
+            if (value1 != null || value2 != null)
+                newValue = joiner.apply(value1, value2);
+
+            return newValue;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b350cdf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
new file mode 100644
index 0000000..4219578
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableKTableRightJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
+
+    private final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+    private final ValueJoiner<V1, V2, V> joiner;
+
+    KTableKTableRightJoin(KTableImpl<K, ?, V1> table1,
+                          KTableImpl<K, ?, V2> table2,
+                          ValueJoiner<V1, V2, V> joiner) {
+        this.valueGetterSupplier1 = table1.valueGetterSupplier();
+        this.valueGetterSupplier2 = table2.valueGetterSupplier();
+        this.joiner = joiner;
+    }
+
+    @Override
+    public Processor<K, V1> get() {
+        return new KTableKTableRightJoinProcessor(valueGetterSupplier2.get());
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        return new KTableValueGetterSupplier<K, V>() {
+
+            public KTableValueGetter<K, V> get() {
+                return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+            }
+
+        };
+    }
+
+    private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, V1> {
+
+        private final KTableValueGetter<K, V2> valueGetter;
+
+        public KTableKTableRightJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
+            this.valueGetter = valueGetter;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            valueGetter.init(context);
+        }
+
+        @Override
+        public void process(K key, V1 value1) {
+            V newValue = null;
+            V2 value2 = valueGetter.get(key);
+
+            if (value2 != null)
+                newValue = joiner.apply(value1, value2);
+
+            context().forward(key, newValue);
+        }
+
+    }
+
+    private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, V> {
+
+        private final KTableValueGetter<K, V1> valueGetter1;
+        private final KTableValueGetter<K, V2> valueGetter2;
+
+        public KTableKTableRightJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+            this.valueGetter1 = valueGetter1;
+            this.valueGetter2 = valueGetter2;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            valueGetter1.init(context);
+            valueGetter2.init(context);
+        }
+
+        @Override
+        public V get(K key) {
+            V2 value2 = valueGetter2.get(key);
+
+            if (value2 != null) {
+                V1 value1 = valueGetter1.get(key);
+                return joiner.apply(value1, value2);
+            } else {
+                return null;
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b350cdf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
new file mode 100644
index 0000000..adcf63a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamKTableLeftJoinTest {
+
+    private String topic1 = "topic1";
+    private String topic2 = "topic2";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "+" + value2;
+        }
+    };
+
+    private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper =
+        new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
+            @Override
+            public KeyValue<Integer, String> apply(Integer key, String value) {
+                return KeyValue.pair(key, value);
+            }
+        };
+
+    @Test
+    public void testJoin() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KStream<Integer, String> stream;
+            KTable<Integer, String> table;
+            MockProcessorSupplier<Integer, String> processor;
+
+            processor = new MockProcessorSupplier<>();
+            stream = builder.from(keyDeserializer, valDeserializer, topic1);
+            table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            stream.leftJoin(table, joiner).process(processor);
+
+            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+            assertEquals(1, copartitionGroups.size());
+            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            // push two items to the primary stream. the other table is empty
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+
+            // push two items to the other stream. this should not produce any item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+
+            // push all items to the other stream. this should not produce any item
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+            // push two items with null to the other stream as deletes. this should not produce any item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], null);
+            }
+
+            processor.checkAndClearResult();
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testNotJoinable() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<Integer, String> stream;
+        KTable<Integer, String> table;
+        MockProcessorSupplier<Integer, String> processor;
+
+        processor = new MockProcessorSupplier<>();
+        stream = builder.from(keyDeserializer, valDeserializer, topic1).map(keyValueMapper);
+        table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+
+        stream.leftJoin(table, joiner).process(processor);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b350cdf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
new file mode 100644
index 0000000..9762cf7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableKTableJoinTest {
+
+    private String topic1 = "topic1";
+    private String topic2 = "topic2";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "+" + value2;
+        }
+    };
+
+    private static class JoinedKeyValue extends KeyValue<Integer, String> {
+        public JoinedKeyValue(Integer key, String value) {
+            super(key, value);
+        }
+    }
+
+    @Test
+    public void testJoin() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KTable<Integer, String> table1;
+            KTable<Integer, String> table2;
+            KTable<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> processor;
+
+            processor = new MockProcessorSupplier<>();
+            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
+            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            joined = table1.join(table2, joiner);
+            joined.toStream().process(processor);
+
+            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+            assertEquals(1, copartitionGroups.size());
+            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+            KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            KTableValueGetter<Integer, String> getter = getterSupplier.get();
+            getter.init(driver.context());
+
+            // push two items to the primary stream. the other table is empty
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:null", "1:null");
+            checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
+
+            // push two items to the other stream. this should produce two items.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
+            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+
+            // push all items to the other stream. this should produce four items.
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+
+            // push two items with null to the other stream as deletes. this should produce two item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], null);
+            }
+
+            processor.checkAndClearResult("0:null", "1:null");
+            checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
+            checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    private JoinedKeyValue kv(Integer key, String value) {
+        return new JoinedKeyValue(key, value);
+    }
+
+    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, JoinedKeyValue... expected) {
+        for (JoinedKeyValue kv : expected) {
+            String value = getter.get(kv.key);
+            if (kv.value == null) {
+                assertNull(value);
+            } else {
+                assertEquals(kv.value, value);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b350cdf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
new file mode 100644
index 0000000..e1ef830
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableKTableLeftJoinTest {
+
+    private String topic1 = "topic1";
+    private String topic2 = "topic2";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "+" + value2;
+        }
+    };
+
+    private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper =
+        new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
+            @Override
+            public KeyValue<Integer, String> apply(Integer key, String value) {
+                return KeyValue.pair(key, value);
+            }
+        };
+
+    private static class JoinedKeyValue extends KeyValue<Integer, String> {
+        public JoinedKeyValue(Integer key, String value) {
+            super(key, value);
+        }
+    }
+
+    @Test
+    public void testJoin() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KTable<Integer, String> table1;
+            KTable<Integer, String> table2;
+            KTable<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> processor;
+
+            processor = new MockProcessorSupplier<>();
+            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
+            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            joined = table1.leftJoin(table2, joiner);
+            joined.toStream().process(processor);
+
+            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+            assertEquals(1, copartitionGroups.size());
+            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+            KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            KTableValueGetter<Integer, String> getter = getterSupplier.get();
+            getter.init(driver.context());
+
+            // push two items to the primary stream. the other table is empty
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
+
+            // push two items to the other stream. this should produce two items.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
+
+            // push all items to the other stream. this should produce four items.
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+
+            // push two items with null to the other stream as deletes. this should produce two item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], null);
+            }
+
+            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+            checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    private JoinedKeyValue kv(Integer key, String value) {
+        return new JoinedKeyValue(key, value);
+    }
+
+    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, JoinedKeyValue... expected) {
+        for (JoinedKeyValue kv : expected) {
+            String value = getter.get(kv.key);
+            if (kv.value == null) {
+                assertNull(value);
+            } else {
+                assertEquals(kv.value, value);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b350cdf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
new file mode 100644
index 0000000..fc800b8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableKTableOuterJoinTest {
+
+    private String topic1 = "topic1";
+    private String topic2 = "topic2";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "+" + value2;
+        }
+    };
+
+    private static class JoinedKeyValue extends KeyValue<Integer, String> {
+        public JoinedKeyValue(Integer key, String value) {
+            super(key, value);
+        }
+    }
+
+    @Test
+    public void testJoin() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KTable<Integer, String> table1;
+            KTable<Integer, String> table2;
+            KTable<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> processor;
+
+            processor = new MockProcessorSupplier<>();
+            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
+            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            joined = table1.outerJoin(table2, joiner);
+            joined.toStream().process(processor);
+
+            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+            assertEquals(1, copartitionGroups.size());
+            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+            KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            KTableValueGetter<Integer, String> getter = getterSupplier.get();
+            getter.init(driver.context());
+
+            // push two items to the primary stream. the other table is empty
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
+
+            // push two items to the other stream. this should produce two items.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
+
+            // push all items to the other stream. this should produce four items.
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+
+            // push two items with null to the other stream as deletes. this should produce two item.
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], null);
+            }
+
+            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+
+            // push all four items to the primary stream. this should produce four items.
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+            checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+
+            // push middle two items to the primary stream with null. this should produce two items.
+
+            for (int i = 1; i < 3; i++) {
+                driver.process(topic1, expectedKeys[i], null);
+            }
+
+            processor.checkAndClearResult("1:null", "2:null+YY2");
+            checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3"));
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    private JoinedKeyValue kv(Integer key, String value) {
+        return new JoinedKeyValue(key, value);
+    }
+
+    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, JoinedKeyValue... expected) {
+        for (JoinedKeyValue kv : expected) {
+            String value = getter.get(kv.key);
+            if (kv.value == null) {
+                assertNull(value);
+            } else {
+                assertEquals(kv.value, value);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b350cdf/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 119f08f..8b16cf6 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -43,6 +43,10 @@ public class KStreamTestDriver {
         this(builder, null, null, null, null, null);
     }
 
+    public KStreamTestDriver(KStreamBuilder builder, File stateDir) {
+        this(builder, stateDir, null, null, null, null);
+    }
+
     public KStreamTestDriver(KStreamBuilder builder,
                              File stateDir,
                              Serializer<?> keySerializer, Deserializer<?> keyDeserializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b350cdf/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index f1aa167..510b458 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 import java.util.ArrayList;
 
+import static org.junit.Assert.assertEquals;
+
 public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
 
     public final ArrayList<String> processed = new ArrayList<>();
@@ -56,4 +58,15 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
         }
 
     }
+
+    public void checkAndClearResult(String... expected) {
+        assertEquals(expected.length, processed.size());
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processed.get(i));
+        }
+
+        processed.clear();
+    }
+
 }


Mime
View raw message