kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Code refacotring in KTable-KTable Join (#4486)
Date Tue, 30 Jan 2018 01:48:56 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fdc14da  MINOR: Code refacotring in KTable-KTable Join (#4486)
fdc14da is described below

commit fdc14dacedc3d3497dcc222b44f539e6355b45a9
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Mon Jan 29 17:48:53 2018 -0800

    MINOR: Code refacotring in KTable-KTable Join (#4486)
    
    1. Rename KTableKTableJoin to KTableKTableInnerJoin. Also removed abstract from other
joins.
    2. Merge KTableKTableJoinValueGetter.java into KTableKTableInnerJoin.
    3. Use set instead of arrays in the stores function, to avoid duplicate stores to be connected
to processors.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../streams/kstream/internals/KTableImpl.java      |  4 +-
 ...TableKTableAbstractJoinValueGetterSupplier.java |  5 +-
 ...eKTableJoin.java => KTableKTableInnerJoin.java} | 51 ++++++++++++++----
 .../kstream/internals/KTableKTableJoinMerger.java  | 21 +++-----
 .../internals/KTableKTableJoinValueGetter.java     | 62 ----------------------
 .../kstream/internals/KTableKTableLeftJoin.java    | 10 ++--
 .../kstream/internals/KTableKTableOuterJoin.java   | 10 ++--
 .../kstream/internals/KTableKTableRightJoin.java   | 10 ++--
 ...oinTest.java => KTableKTableInnerJoinTest.java} |  2 +-
 9 files changed, 70 insertions(+), 105 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 0cf56ef..a746d31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -756,8 +756,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
 
         if (!leftOuter) { // inner
-            joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other,
joiner);
-            joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other,
this, reverseJoiner(joiner));
+            joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, V1>)
other, joiner);
+            joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, V1>) other,
this, reverseJoiner(joiner));
         } else if (!rightOuter) { // left
             joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>)
other, joiner);
             joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other,
this, reverseJoiner(joiner));
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
index d36920a..f2de67f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
@@ -16,8 +16,9 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 public abstract class KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> implements
KTableValueGetterSupplier<K, R> {
     final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
@@ -33,7 +34,7 @@ public abstract class KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2>
     public String[] storeNames() {
         final String[] storeNames1 = valueGetterSupplier1.storeNames();
         final String[] storeNames2 = valueGetterSupplier2.storeNames();
-        final ArrayList<String> stores = new ArrayList<>(storeNames1.length +
storeNames2.length);
+        final Set<String> stores = new HashSet<>(storeNames1.length + storeNames2.length);
         Collections.addAll(stores, storeNames1);
         Collections.addAll(stores, storeNames2);
         return stores.toArray(new String[stores.size()]);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
similarity index 59%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index c424f4f..e170175 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
V2> {
+class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
V1, V2> {
 
     private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K,
V1, K>() {
         @Override
@@ -31,7 +31,7 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R, V1,
         }
     };
 
-    KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2,
ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
+    KTableKTableInnerJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2,
ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
         super(table1, table2, joiner);
     }
 
@@ -42,20 +42,17 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R, V1,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableKTableAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
+        return new KTableKTableInnerJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
     }
 
-    private class KTableKTableAbstractJoinValueGetterSupplier extends org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
+    private class KTableKTableInnerJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
 
-        public KTableKTableAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2)
{
+        KTableKTableInnerJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
         public KTableValueGetter<K, R> get() {
-            return new KTableKTableJoinValueGetter<>(valueGetterSupplier1.get(),
-                                                     valueGetterSupplier2.get(),
-                                                     joiner,
-                                                     keyValueMapper);
+            return new KTableKTableInnerJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
         }
     }
 
@@ -63,7 +60,7 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R, V1,
 
         private final KTableValueGetter<K, V2> valueGetter;
 
-        public KTableKTableJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
+        KTableKTableJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
         }
 
@@ -100,4 +97,38 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R, V1,
         }
     }
 
+    private class KTableKTableInnerJoinValueGetter implements KTableValueGetter<K, R>
{
+
+        private final KTableValueGetter<K, V1> valueGetter1;
+        private final KTableValueGetter<K, V2> valueGetter2;
+
+        KTableKTableInnerJoinValueGetter(final KTableValueGetter<K, V1> valueGetter1,
+                                         final KTableValueGetter<K, V2> valueGetter2)
{
+            this.valueGetter1 = valueGetter1;
+            this.valueGetter2 = valueGetter2;
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            valueGetter1.init(context);
+            valueGetter2.init(context);
+        }
+
+        @Override
+        public R get(final K key) {
+            final V1 value1 = valueGetter1.get(key);
+
+            if (value1 != null) {
+                V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1));
+
+                if (value2 != null) {
+                    return joiner.apply(value1, value2);
+                } else {
+                    return null;
+                }
+            } else {
+                return null;
+            }
+        }
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index d27b8bd..6400750 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -21,6 +21,10 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V>
{
 
     private final KTableImpl<K, ?, V> parent1;
@@ -56,21 +60,12 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
V, V> {
 
                 @Override
                 public String[] storeNames() {
-                    // we need to allow the downstream processor to be able to access both
ends of the joining table's value getters
                     final String[] storeNames1 = parent1.valueGetterSupplier().storeNames();
                     final String[] storeNames2 = parent2.valueGetterSupplier().storeNames();
-
-                    final String[] stores = new String[storeNames1.length + storeNames2.length];
-                    int i = 0;
-                    for (final String storeName : storeNames1) {
-                        stores[i] = storeName;
-                        i++;
-                    }
-                    for (final String storeName : storeNames2) {
-                        stores[i] = storeName;
-                        i++;
-                    }
-                    return stores;
+                    final Set<String> stores = new HashSet<>(storeNames1.length
+ storeNames2.length);
+                    Collections.addAll(stores, storeNames1);
+                    Collections.addAll(stores, storeNames2);
+                    return stores.toArray(new String[stores.size()]);
                 }
             };
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
deleted file mode 100644
index c8c3eb7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-class KTableKTableJoinValueGetter<K1, V1, K2, V2, R> implements KTableValueGetter<K1,
R> {
-
-    private final KTableValueGetter<K1, V1> valueGetter1;
-    private final KTableValueGetter<K2, V2> valueGetter2;
-    private final ValueJoiner<? super V1, ? super V2, ? extends R>  joiner;
-    private final KeyValueMapper<K1, V1, K2> keyValueMapper;
-
-    public KTableKTableJoinValueGetter(final KTableValueGetter<K1, V1> valueGetter1,
-                                       final KTableValueGetter<K2, V2> valueGetter2,
-                                       final ValueJoiner<? super V1, ? super V2, ? extends
R>  joiner,
-                                       final KeyValueMapper<K1, V1, K2> keyValueMapper)
{
-        this.valueGetter1 = valueGetter1;
-        this.valueGetter2 = valueGetter2;
-        this.joiner = joiner;
-        this.keyValueMapper = keyValueMapper;
-    }
-
-    @Override
-    public void init(ProcessorContext context) {
-        valueGetter1.init(context);
-        valueGetter2.init(context);
-    }
-
-    @Override
-    public R get(K1 key) {
-        R newValue = null;
-        V1 value1 = valueGetter1.get(key);
-
-        if (value1 != null) {
-            V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1));
-
-            if (value2 != null) {
-                newValue = joiner.apply(value1, value2);
-            }
-        }
-
-        return newValue;
-    }
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 33aef02..bb3e652 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -34,12 +34,12 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableKTableLeftAbstractJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
+        return new KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
     }
 
-    private class KTableKTableLeftAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
+    private class KTableKTableLeftJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
 
-        public KTableKTableLeftAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2)
{
+        KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
@@ -53,7 +53,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
         private final KTableValueGetter<K, V2> valueGetter;
 
-        public KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter)
{
+        KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
         }
 
@@ -94,7 +94,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
 
-        public KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1> valueGetter1,
KTableValueGetter<K, V2> valueGetter2) {
+        KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K,
V2> valueGetter2) {
             this.valueGetter1 = valueGetter1;
             this.valueGetter2 = valueGetter2;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index d2e1d79..e7c170e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -34,12 +34,12 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableKTableOuterAbstractJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
+        return new KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
     }
 
-    private class KTableKTableOuterAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
+    private class KTableKTableOuterJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
 
-        public KTableKTableOuterAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2)
{
+        KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
@@ -52,7 +52,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
         private final KTableValueGetter<K, V2> valueGetter;
 
-        public KTableKTableOuterJoinProcessor(KTableValueGetter<K, V2> valueGetter)
{
+        KTableKTableOuterJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
         }
 
@@ -94,7 +94,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
 
-        public KTableKTableOuterJoinValueGetter(KTableValueGetter<K, V1> valueGetter1,
KTableValueGetter<K, V2> valueGetter2) {
+        KTableKTableOuterJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K,
V2> valueGetter2) {
             this.valueGetter1 = valueGetter1;
             this.valueGetter2 = valueGetter2;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index f4c840b..c540cf9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -35,12 +35,12 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableKTableRightAbstractJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
+        return new KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
     }
 
-    private class KTableKTableRightAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
+    private class KTableKTableRightJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
 
-        public KTableKTableRightAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2)
{
+        KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
@@ -53,7 +53,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
         private final KTableValueGetter<K, V2> valueGetter;
 
-        public KTableKTableRightJoinProcessor(KTableValueGetter<K, V2> valueGetter)
{
+        KTableKTableRightJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
         }
 
@@ -94,7 +94,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
 
-        public KTableKTableRightJoinValueGetter(KTableValueGetter<K, V1> valueGetter1,
KTableValueGetter<K, V2> valueGetter2) {
+        KTableKTableRightJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K,
V2> valueGetter2) {
             this.valueGetter1 = valueGetter1;
             this.valueGetter2 = valueGetter2;
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
similarity index 99%
rename from streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
rename to streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 09d4aa0..b890e2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -42,7 +42,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-public class KTableKTableJoinTest {
+public class KTableKTableInnerJoinTest {
 
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message