kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2962: stream-table table-table joins
Date Wed, 09 Dec 2015 07:33:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 454d7d090 -> 991aad23b


KAFKA-2962: stream-table table-table joins

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #644 from ymatsuda/join_methods


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/991aad23
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/991aad23
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/991aad23

Branch: refs/heads/trunk
Commit: 991aad23baa2f55d405d374b0a01785acdc63974
Parents: 454d7d0
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Tue Dec 8 23:33:46 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Dec 8 23:33:46 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   |  10 ++
 .../kafka/streams/kstream/KStreamBuilder.java   |   2 +-
 .../apache/kafka/streams/kstream/KTable.java    |  33 +++++
 .../kstream/internals/AbstractStream.java       |  64 ++++++++
 .../streams/kstream/internals/KStreamImpl.java  |  35 +++--
 .../streams/kstream/internals/KStreamJoin.java  |   9 --
 .../internals/KStreamKTableLeftJoin.java        |  62 ++++++++
 .../kstream/internals/KStreamWindowedImpl.java  |   2 +-
 .../KTableDerivedValueGetterSupplier.java       |  28 ----
 .../streams/kstream/internals/KTableFilter.java |  15 +-
 .../streams/kstream/internals/KTableImpl.java   | 147 ++++++++++++-------
 .../kstream/internals/KTableKTableJoin.java     | 115 +++++++++++++++
 .../kstream/internals/KTableKTableLeftJoin.java | 112 ++++++++++++++
 .../kstream/internals/KTableMapValues.java      |  14 +-
 .../streams/kstream/internals/KTableMerge.java  |  92 ++++++++++++
 .../internals/KTableProcessorSupplier.java      |   4 +-
 .../streams/kstream/internals/KTableSource.java |   6 +-
 17 files changed, 634 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 93303eb..d3931ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -172,4 +172,14 @@ public interface KStream<K, V> {
      */
     void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
 
+    /**
+     * Combines values of this stream with KTable using Left Join.
+     *
+     * @param ktable the instance of KTable joined with this stream
+     * @param joiner ValueJoiner
+     * @param <V1>   the value type of the other stream
+     * @param <V2>   the value type of the new stream
+     */
+    <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> ktable, ValueJoiner<V, V1, V2> joiner);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index ca1a10d..0ed5144 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -102,7 +102,7 @@ public class KStreamBuilder extends TopologyBuilder {
         KTableProcessorSupplier<K, V, V> processorSupplier = new KTableSource<>(topic);
         addProcessor(name, processorSupplier, source);
 
-        return new KTableImpl<>(this, name, processorSupplier, source, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer);
+        return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerializer, valSerializer, keyDeserializer, valDeserializer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 75fb87a..c6e7975 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -105,4 +105,37 @@ public interface KTable<K, V> {
      */
     KStream<K, V> toStream();
 
+    /**
+     * Combines values of this KTable with another KTable using Inner Join.
+     *
+     * @param other the instance of KTable joined with this stream
+     * @param joiner ValueJoiner
+     * @param <V1>   the value type of the other stream
+     * @param <V2>   the value type of the new stream
+     * @return the instance of KStream
+     */
+    <V1, V2> KTable<K, V2> join(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+
+    /**
+     * Combines values of this KTable with another KTable using Outer Join.
+     *
+     * @param other the instance of KTable joined with this stream
+     * @param joiner ValueJoiner
+     * @param <V1>   the value type of the other stream
+     * @param <V2>   the value type of the new stream
+     * @return the instance of KStream
+     */
+    <V1, V2> KTable<K, V2> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+
+    /**
+     * Combines values of this KTable with another KTable using Left Join.
+     *
+     * @param other the instance of KTable joined with this stream
+     * @param joiner ValueJoiner
+     * @param <V1>   the value type of the other stream
+     * @param <V2>   the value type of the new stream
+     * @return the instance of KStream
+     */
+    <V1, V2> KTable<K, V2> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
new file mode 100644
index 0000000..fa34ba1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public abstract class AbstractStream<K> {
+
+    protected final KStreamBuilder topology;
+    protected final String name;
+    protected final Set<String> sourceNodes;
+
+    public AbstractStream(KStreamBuilder topology, String name, Set<String> sourceNodes) {
+        this.topology = topology;
+        this.name = name;
+        this.sourceNodes = sourceNodes;
+    }
+
+    protected Set<String> ensureJoinableWith(AbstractStream<K> other) {
+        Set<String> thisSourceNodes = sourceNodes;
+        Set<String> otherSourceNodes = other.sourceNodes;
+
+        if (thisSourceNodes == null || otherSourceNodes == null)
+            throw new KafkaException("not joinable");
+
+        Set<String> allSourceNodes = new HashSet<>();
+        allSourceNodes.addAll(thisSourceNodes);
+        allSourceNodes.addAll(otherSourceNodes);
+
+        topology.copartitionSources(allSourceNodes);
+
+        return allSourceNodes;
+    }
+
+    public static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
+        return new ValueJoiner<T2, T1, R>() {
+            @Override
+            public R apply(T2 value2, T1 value1) {
+                return joiner.apply(value1, value2);
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index fc8f4c6..67a2d27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,8 +20,10 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamWindowed;
@@ -35,7 +37,7 @@ import java.lang.reflect.Array;
 import java.util.HashSet;
 import java.util.Set;
 
-public class KStreamImpl<K, V> implements KStream<K, V> {
+public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
 
     private static final String FILTER_NAME = "KSTREAM-FILTER-";
 
@@ -65,18 +67,14 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
 
+    public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
+
     public static final String MERGE_NAME = "KSTREAM-MERGE-";
 
     public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
 
-    protected final KStreamBuilder topology;
-    public final String name;
-    protected final Set<String> sourceNodes;
-
     public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
-        this.topology = topology;
-        this.name = name;
-        this.sourceNodes = sourceNodes;
+        super(topology, name, sourceNodes);
     }
 
     @Override
@@ -191,9 +189,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
                                  Serializer<V> valSerializer,
                                  Deserializer<K> keyDeserializer,
                                  Deserializer<V> valDeserializer) {
-        String sendName = topology.newName(SINK_NAME);
-
-        topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
+        to(topic, keySerializer, valSerializer);
 
         return topology.from(keyDeserializer, valDeserializer, topic);
     }
@@ -205,9 +201,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     @Override
     public void to(String topic) {
-        String name = topology.newName(SINK_NAME);
-
-        topology.addSink(name, topic, this.name);
+        to(topic, null, null);
     }
 
     @Override
@@ -244,4 +238,17 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
         topology.addProcessor(name, processorSupplier, this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+        String name = topology.newName(LEFTJOIN_NAME);
+
+        topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
+
+        return new KStreamImpl<>(topology, name, allSourceNodes);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
index 5e8186e..eefb8c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
@@ -81,13 +81,4 @@ class KStreamJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
         }
     }
 
-    public static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
-        return new ValueJoiner<T2, T1, R>() {
-            @Override
-            public R apply(T2 value2, T1 value1) {
-                return joiner.apply(value1, value2);
-            }
-        };
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
new file mode 100644
index 0000000..51a6277
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamKTableLeftJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
+
+    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
+    private final ValueJoiner<V1, V2, V> joiner;
+
+    KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, V> joiner) {
+        this.valueGetterSupplier = table.valueGetterSupplier();
+        this.joiner = joiner;
+    }
+
+    @Override
+    public Processor<K, V1> get() {
+        return new KStreamKTableLeftJoinProcessor(valueGetterSupplier.get());
+    }
+
+    private class KStreamKTableLeftJoinProcessor extends AbstractProcessor<K, V1> {
+
+        private final KTableValueGetter<K, V2> valueGetter;
+
+        public KStreamKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
+            this.valueGetter = valueGetter;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            valueGetter.init(context);
+        }
+
+        @Override
+        public void process(K key, V1 value) {
+            context().forward(key, joiner.apply(value, valueGetter.get(key)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
index 100fbee..c71c11b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
@@ -50,7 +50,7 @@ public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implement
         allSourceNodes.addAll(((KStreamWindowedImpl<K, V1>) other).sourceNodes);
 
         KStreamJoin<K, V2, V, V1> joinThis = new KStreamJoin<>(otherWindowName, valueJoiner);
-        KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner));
+        KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, reverseJoiner(valueJoiner));
         KStreamPassThrough<K, V2> joinMerge = new KStreamPassThrough<>();
 
         String joinThisName = topology.newName(JOINTHIS_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
deleted file mode 100644
index 731d7f7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-public abstract class KTableDerivedValueGetterSupplier<K, V1, V2> implements KTableValueGetterSupplier<K, V2> {
-
-    protected final KTableValueGetterSupplier<K, V1> parentValueGetterSupplier;
-
-    public KTableDerivedValueGetterSupplier(KTableValueGetterSupplier<K, V1> parentValueGetterSupplier) {
-        this.parentValueGetterSupplier = parentValueGetterSupplier;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 212b1c9..f8f00b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -22,12 +22,14 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableFilter<K, V> extends KTableProcessorSupplier<K, V, V> {
+class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
 
+    private final KTableImpl<K, ?, V> parent;
     private final Predicate<K, V> predicate;
     private final boolean filterOut;
 
-    public KTableFilter(Predicate<K, V> predicate, boolean filterOut) {
+    public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<K, V> predicate, boolean filterOut) {
+        this.parent = parent;
         this.predicate = predicate;
         this.filterOut = filterOut;
     }
@@ -38,8 +40,11 @@ class KTableFilter<K, V> extends KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
-        return new KTableDerivedValueGetterSupplier<K, V, V>(parentValueGetterSupplier) {
+    public KTableValueGetterSupplier<K, V> view() {
+
+        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
+
+        return new KTableValueGetterSupplier<K, V>() {
 
             public KTableValueGetter<K, V> get() {
                 return new KTableFilterValueGetter(parentValueGetterSupplier.get());
@@ -74,10 +79,12 @@ class KTableFilter<K, V> extends KTableProcessorSupplier<K, V, V> {
             this.parentGetter = parentGetter;
         }
 
+        @Override
         public void init(ProcessorContext context) {
             parentGetter.init(context);
         }
 
+        @Override
         public V get(K key) {
             return computeNewValue(key, parentGetter.get(key));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 47c9b09..308e4f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -23,10 +23,11 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
-import java.util.Collections;
+import java.util.Set;
 
 /**
  * The implementation class of KTable
@@ -34,7 +35,7 @@ import java.util.Collections;
  * @param <S> the source's (parent's) value type
  * @param <V> the value type
  */
-public class KTableImpl<K, S, V> implements KTable<K, V> {
+public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
 
     private static final String FILTER_NAME = "KTABLE-FILTER-";
 
@@ -44,13 +45,22 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
 
     public static final String SOURCE_NAME = "KTABLE-SOURCE-";
 
-    protected final KStreamBuilder topology;
-    public final String name;
+    public static final String SINK_NAME = "KTABLE-SINK-";
+
+    public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
+
+    public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
+
+    public static final String OUTERTHIS_NAME = "KTABLE-OUTERTHIS-";
+
+    public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
+
+    public static final String LEFTJOIN_NAME = "KTABLE-LEFTJOIN-";
+
+    public static final String MERGE_NAME = "KTABLE-MERGE-";
+
     public final KTableProcessorSupplier<K, S, V> processorSupplier;
-    private final String sourceNode;
 
-    private final KTableImpl<K, ?, S> parent;
-    private final String topic;
     private final Serializer<K> keySerializer;
     private final Serializer<V> valSerializer;
     private final Deserializer<K> keyDeserializer;
@@ -59,72 +69,53 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
     public KTableImpl(KStreamBuilder topology,
                       String name,
                       KTableProcessorSupplier<K, S, V> processorSupplier,
-                      String sourceNode,
-                      KTableImpl<K, ?, S> parent) {
-        this(topology, name, processorSupplier, sourceNode, null, null, null, null, null, parent);
+                      Set<String> sourceNodes) {
+        this(topology, name, processorSupplier, sourceNodes, null, null, null, null);
     }
 
     public KTableImpl(KStreamBuilder topology,
                       String name,
                       KTableProcessorSupplier<K, S, V> processorSupplier,
-                      String sourceNode,
-                      String topic,
+                      Set<String> sourceNodes,
                       Serializer<K> keySerializer,
                       Serializer<V> valSerializer,
                       Deserializer<K> keyDeserializer,
                       Deserializer<V> valDeserializer) {
-        this(topology, name, processorSupplier, sourceNode, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer, null);
-    }
-
-    private KTableImpl(KStreamBuilder topology,
-                       String name,
-                       KTableProcessorSupplier<K, S, V> processorSupplier,
-                       String sourceNode,
-                       String topic,
-                       Serializer<K> keySerializer,
-                       Serializer<V> valSerializer,
-                       Deserializer<K> keyDeserializer,
-                       Deserializer<V> valDeserializer,
-                       KTableImpl<K, ?, S> parent) {
-        this.topology = topology;
-        this.name = name;
+        super(topology, name, sourceNodes);
         this.processorSupplier = processorSupplier;
-        this.sourceNode = sourceNode;
-        this.topic = topic;
         this.keySerializer = keySerializer;
         this.valSerializer = valSerializer;
         this.keyDeserializer = keyDeserializer;
         this.valDeserializer = valDeserializer;
-        this.parent = parent;
     }
 
     @Override
     public KTable<K, V> filter(Predicate<K, V> predicate) {
         String name = topology.newName(FILTER_NAME);
-        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, false);
+        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, false);
         topology.addProcessor(name, processorSupplier, this.name);
 
-        return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
     }
 
     @Override
     public KTable<K, V> filterOut(final Predicate<K, V> predicate) {
         String name = topology.newName(FILTER_NAME);
-        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, true);
+        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true);
 
         topology.addProcessor(name, processorSupplier, this.name);
 
-        return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
     }
 
     @Override
     public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
         String name = topology.newName(MAPVALUES_NAME);
-        KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(mapper);
+        KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper);
 
         topology.addProcessor(name, processorSupplier, this.name);
 
-        return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
     }
 
     @Override
@@ -133,9 +124,7 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
                                 Serializer<V> valSerializer,
                                 Deserializer<K> keyDeserializer,
                                 Deserializer<V> valDeserializer) {
-        String sendName = topology.newName(KStreamImpl.SINK_NAME);
-
-        topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
+        to(topic, keySerializer, valSerializer);
 
         return topology.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic);
     }
@@ -147,14 +136,12 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
 
     @Override
     public void to(String topic) {
-        String name = topology.newName(KStreamImpl.SINK_NAME);
-
-        topology.addSink(name, topic, this.name);
+        to(topic, null, null);
     }
 
     @Override
     public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
-        String name = topology.newName(KStreamImpl.SINK_NAME);
+        String name = topology.newName(SINK_NAME);
 
         topology.addSink(name, topic, keySerializer, valSerializer, this.name);
     }
@@ -165,25 +152,85 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
 
         topology.addProcessor(name, new KStreamPassThrough(), this.name);
 
-        return new KStreamImpl<>(topology, name, Collections.singleton(sourceNode));
+        return new KStreamImpl<>(topology, name, sourceNodes);
     }
 
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
-        if (parent != null) {
-            return processorSupplier.view(parent.valueGetterSupplier());
-        } else {
-            KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
+        if (processorSupplier instanceof KTableSource) {
+            final KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
             synchronized (source) {
                 if (!source.isMaterialized()) {
                     StateStoreSupplier storeSupplier =
-                            new KTableStoreSupplier(topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
+                        new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
                     // mark this state is non internal hence it is read directly from a user topic
                     topology.addStateStore(storeSupplier, false, name);
                     source.materialize();
                 }
             }
-            return new KTableSourceValueGetterSupplier<>(topic);
+            return new KTableSourceValueGetterSupplier<>(source.topic);
+        } else {
+            return processorSupplier.view();
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+        String joinThisName = topology.newName(JOINTHIS_NAME);
+        String joinOtherName = topology.newName(JOINOTHER_NAME);
+        String joinMergeName = topology.newName(MERGE_NAME);
+
+        KTableKTableJoin<K, R, V, V1> joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+        KTableKTableJoin<K, R, V1, V> joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+        KTableMerge<K, R> joinMerge = new KTableMerge<>(
+                new KTableImpl<K, V, R>(topology, joinThisName, null, this.sourceNodes),
+                new KTableImpl<K, V1, R>(topology, joinOtherName, null, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+        );
+
+        topology.addProcessor(joinThisName, joinThis, this.name);
+        topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
+        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+
+        return new KTableImpl<>(topology, name, joinThis, allSourceNodes);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+        String joinThisName = topology.newName(OUTERTHIS_NAME);
+        String joinOtherName = topology.newName(OUTEROTHER_NAME);
+        String joinMergeName = topology.newName(MERGE_NAME);
+
+        KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+        KTableKTableLeftJoin<K, R, V1, V> joinOther = new KTableKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+        KTableMerge<K, R> joinMerge = new KTableMerge<>(
+                new KTableImpl<K, V, R>(topology, joinThisName, null, this.sourceNodes),
+                new KTableImpl<K, V1, R>(topology, joinOtherName, null, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+        );
+
+        topology.addProcessor(joinThisName, joinThis, this.name);
+        topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
+        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+
+        return new KTableImpl<>(topology, name, joinMerge, allSourceNodes);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+        String name = topology.newName(LEFTJOIN_NAME);
+
+        KTableKTableLeftJoin<K, R, V, V1> leftJoin = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+
+        topology.addProcessor(name, leftJoin, this.name);
+
+        return new KTableImpl<>(topology, name, leftJoin, allSourceNodes);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
new file mode 100644
index 0000000..058e75d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableKTableJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
+
+    private final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+    private final ValueJoiner<V1, V2, V> joiner;
+
+    KTableKTableJoin(KTableImpl<K, ?, V1> table1,
+                     KTableImpl<K, ?, V2> table2,
+                     ValueJoiner<V1, V2, V> joiner) {
+        this.valueGetterSupplier1 = table1.valueGetterSupplier();
+        this.valueGetterSupplier2 = table2.valueGetterSupplier();
+        this.joiner = joiner;
+    }
+
+    @Override
+    public Processor<K, V1> get() {
+        return new KTableKTableJoinProcessor(valueGetterSupplier2.get());
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        return new KTableValueGetterSupplier<K, V>() {
+
+            public KTableValueGetter<K, V> get() {
+                return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+            }
+
+        };
+    }
+
+    private class KTableKTableJoinProcessor extends AbstractProcessor<K, V1> {
+
+        private final KTableValueGetter<K, V2> valueGetter;
+
+        public KTableKTableJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
+            this.valueGetter = valueGetter;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            valueGetter.init(context);
+        }
+
+        @Override
+        public void process(K key, V1 value1) {
+            V newValue = null;
+
+            if (value1 != null) {
+                V2 value2 = valueGetter.get(key);
+
+                if (value2 != null)
+                    newValue = joiner.apply(value1, value2);
+            }
+
+            context().forward(key, newValue);
+        }
+    }
+
+    private class KTableKTableJoinValueGetter implements KTableValueGetter<K, V> {
+
+        private final KTableValueGetter<K, V1> valueGetter1;
+        private final KTableValueGetter<K, V2> valueGetter2;
+
+        public KTableKTableJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+            this.valueGetter1 = valueGetter1;
+            this.valueGetter2 = valueGetter2;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            valueGetter1.init(context);
+            valueGetter2.init(context);
+        }
+
+        @Override
+        public V get(K key) {
+            V1 value1 = valueGetter1.get(key);
+
+            if (value1 != null) {
+                V2 value2 = valueGetter2.get(key);
+                return joiner.apply(value1, value2);
+            } else {
+                return null;
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
new file mode 100644
index 0000000..fe4e280
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableKTableLeftJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
+
+    private final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+    private final ValueJoiner<V1, V2, V> joiner;
+
+    KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1,
+                         KTableImpl<K, ?, V2> table2,
+                         ValueJoiner<V1, V2, V> joiner) {
+        this.valueGetterSupplier1 = table1.valueGetterSupplier();
+        this.valueGetterSupplier2 = table2.valueGetterSupplier();
+        this.joiner = joiner;
+    }
+
+    @Override
+    public Processor<K, V1> get() {
+        return new KTableKTableLeftJoinProcessor(valueGetterSupplier2.get());
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        return new KTableValueGetterSupplier<K, V>() {
+
+            public KTableValueGetter<K, V> get() {
+                return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+            }
+
+        };
+    }
+
+    private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, V1> {
+
+        private final KTableValueGetter<K, V2> valueGetter;
+
+        public KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
+            this.valueGetter = valueGetter;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            valueGetter.init(context);
+        }
+
+        @Override
+        public void process(K key, V1 value1) {
+            V newValue = null;
+
+            if (value1 != null)
+                newValue = joiner.apply(value1, valueGetter.get(key));
+
+            context().forward(key, newValue);
+        }
+
+    }
+
+    private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, V> {
+
+        private final KTableValueGetter<K, V1> valueGetter1;
+        private final KTableValueGetter<K, V2> valueGetter2;
+
+        public KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+            this.valueGetter1 = valueGetter1;
+            this.valueGetter2 = valueGetter2;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            valueGetter1.init(context);
+            valueGetter2.init(context);
+        }
+
+        @Override
+        public V get(K key) {
+            V1 value1 = valueGetter1.get(key);
+
+            if (value1 != null) {
+                V2 value2 = valueGetter2.get(key);
+                return joiner.apply(value1, value2);
+            } else {
+                return null;
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 0d14390..300cce4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -22,11 +22,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableMapValues<K1, V1, V2> extends KTableProcessorSupplier<K1, V1, V2> {
+class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> {
 
+    private final KTableImpl<K1, ?, V1> parent;
     private final ValueMapper<V1, V2> mapper;
 
-    public KTableMapValues(ValueMapper<V1, V2> mapper) {
+    public KTableMapValues(KTableImpl<K1, ?, V1> parent, ValueMapper<V1, V2> mapper) {
+        this.parent = parent;
         this.mapper = mapper;
     }
 
@@ -36,8 +38,10 @@ class KTableMapValues<K1, V1, V2> extends KTableProcessorSupplier<K1, V1, V2> {
     }
 
     @Override
-    public KTableValueGetterSupplier<K1, V2> view(KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier) {
-        return new KTableDerivedValueGetterSupplier<K1, V1, V2>(parentValueGetterSupplier) {
+    public KTableValueGetterSupplier<K1, V2> view() {
+        final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
+
+        return new KTableValueGetterSupplier<K1, V2>() {
 
             public KTableValueGetter<K1, V2> get() {
                 return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
@@ -72,10 +76,12 @@ class KTableMapValues<K1, V1, V2> extends KTableProcessorSupplier<K1, V1, V2> {
             this.parentGetter = parentGetter;
         }
 
+        @Override
         public void init(ProcessorContext context) {
             parentGetter.init(context);
         }
 
+        @Override
         public V2 get(K1 key) {
             return computeNewValue(parentGetter.get(key));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java
new file mode 100644
index 0000000..715ea6c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableMerge<K, V> implements KTableProcessorSupplier<K, V, V> {
+
+    private final KTableImpl<K, ?, V>[] parents;
+
+    public KTableMerge(KTableImpl<K, ?, V>... parents) {
+        this.parents = parents.clone();
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KTableMergeProcessor<>();
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        final KTableValueGetterSupplier<K, V>[] valueGetterSuppliers = new KTableValueGetterSupplier[parents.length];
+
+        for (int i = 0; i < parents.length; i++) {
+            valueGetterSuppliers[i] = parents[i].valueGetterSupplier();
+        }
+        return new KTableValueGetterSupplier<K, V>() {
+
+            public KTableValueGetter<K, V> get() {
+                KTableValueGetter<K, V>[] valueGetters = new KTableValueGetter[valueGetterSuppliers.length];
+
+                for (int i = 0; i < valueGetters.length; i++) {
+                    valueGetters[i] = valueGetterSuppliers[i].get();
+                }
+                return new KTableMergeValueGetter(valueGetters);
+            }
+
+        };
+    }
+
+    private class KTableMergeProcessor<K, V> extends AbstractProcessor<K, V> {
+        @Override
+        public void process(K key, V value) {
+            context().forward(key, value);
+        }
+    }
+
+    private class KTableMergeValueGetter implements KTableValueGetter<K, V> {
+
+        private final KTableValueGetter<K, V>[] valueGetters;
+
+        public KTableMergeValueGetter(KTableValueGetter<K, V>[] valueGetters) {
+            this.valueGetters = valueGetters;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            for (KTableValueGetter<K, V> valueGetter : valueGetters) {
+                valueGetter.init(context);
+            }
+        }
+
+        @Override
+        public V get(K key) {
+            for (KTableValueGetter<K, V> valueGetter : valueGetters) {
+                V value = valueGetter.get(key);
+                if (value != null)
+                    return value;
+            }
+            return null;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
index cc6467f..2fe5c15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-public abstract class KTableProcessorSupplier<K, V, T> implements ProcessorSupplier<K, V> {
+public interface KTableProcessorSupplier<K, V, T> extends ProcessorSupplier<K, V> {
 
-    public abstract KTableValueGetterSupplier<K, T> view(KTableValueGetterSupplier<K, V> parentValueGetterFactory);
+    KTableValueGetterSupplier<K, T> view();
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 93790ed..60b2d5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 
-public class KTableSource<K, V> extends KTableProcessorSupplier<K, V, V> {
+public class KTableSource<K, V> implements KTableProcessorSupplier<K, V, V> {
 
-    private final String topic;
+    public final String topic;
 
     private boolean materialized = false;
 
@@ -46,7 +46,7 @@ public class KTableSource<K, V> extends KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
+    public KTableValueGetterSupplier<K, V> view() {
         throw new IllegalStateException("a view cannot be define on the ktable source");
     }
 


Mime
View raw message