kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: join test for windowed keys
Date Wed, 27 Jan 2016 07:03:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9ffa907d7 -> 22de0a8ab


MINOR: join test for windowed keys

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #814 from ymatsuda/windowed_key_join_test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/22de0a8a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/22de0a8a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/22de0a8a

Branch: refs/heads/trunk
Commit: 22de0a8ab5d0e84fa40016754f9a8eff8193aa89
Parents: 9ffa907
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Tue Jan 26 23:03:42 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jan 26 23:03:42 2016 -0800

----------------------------------------------------------------------
 .../kstream/internals/KStreamAggregate.java     |   4 +-
 .../kstream/internals/KStreamAggregateTest.java | 155 +++++++++++++++++++
 2 files changed, 157 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/22de0a8a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 26002f5..49f3e71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -161,10 +161,10 @@ public class KStreamAggregate<K, V, T, W extends Window> implements
KTableProces
             K key = windowedKey.value();
             W window = (W) windowedKey.window();
 
-            // this iterator should only contain one element
+            // this iterator should contain at most one element
             Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(),
window.start());
 
-            return iter.next().value;
+            return iter.hasNext() ? iter.next().value : null;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/22de0a8a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
index ecc303d..8a81113 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.HoppingWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -137,4 +138,158 @@ public class KStreamAggregateTest {
             Utils.delete(baseDir);
         }
     }
+
+    @Test
+    public void testJoin() throws Exception {
+        final File baseDir = Files.createTempDirectory("test").toFile();
+
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+            String topic1 = "topic1";
+            String topic2 = "topic2";
+
+            KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer,
topic1);
+            KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new
StringCanonizer(),
+                    HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
+                    strSerializer,
+                    strSerializer,
+                    strDeserializer,
+                    strDeserializer);
+
+            MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
+            table1.toStream().process(proc1);
+
+            KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer,
topic2);
+            KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new
StringCanonizer(),
+                    HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
+                    strSerializer,
+                    strSerializer,
+                    strDeserializer,
+                    strDeserializer);
+
+            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+            table2.toStream().process(proc2);
+
+
+            MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
+            table1.join(table2, new ValueJoiner<String, String, String>() {
+                @Override
+                public String apply(String p1, String p2) {
+                    return p1 + "%" + p2;
+                }
+            }).toStream().process(proc3);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+            driver.setTime(0L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(1L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(2L);
+            driver.process(topic1, "C", "3");
+            driver.setTime(3L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(4L);
+            driver.process(topic1, "A", "1");
+
+            proc1.checkAndClearResult(
+                    "[A@0]:0+1",
+                    "[B@0]:0+2",
+                    "[C@0]:0+3",
+                    "[D@0]:0+4",
+                    "[A@0]:0+1+1"
+            );
+            proc2.checkAndClearResult();
+            proc3.checkAndClearResult(
+                    "[A@0]:null",
+                    "[B@0]:null",
+                    "[C@0]:null",
+                    "[D@0]:null",
+                    "[A@0]:null"
+            );
+
+            driver.setTime(5L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(6L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(7L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(8L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(9L);
+            driver.process(topic1, "C", "3");
+
+            proc1.checkAndClearResult(
+                    "[A@0]:0+1+1+1", "[A@5]:0+1",
+                    "[B@0]:0+2+2", "[B@5]:0+2",
+                    "[D@0]:0+4+4", "[D@5]:0+4",
+                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                    "[C@0]:0+3+3", "[C@5]:0+3"
+            );
+            proc2.checkAndClearResult();
+            proc3.checkAndClearResult(
+                    "[A@0]:null", "[A@5]:null",
+                    "[B@0]:null", "[B@5]:null",
+                    "[D@0]:null", "[D@5]:null",
+                    "[B@0]:null", "[B@5]:null",
+                    "[C@0]:null", "[C@5]:null"
+            );
+
+            driver.setTime(0L);
+            driver.process(topic2, "A", "a");
+            driver.setTime(1L);
+            driver.process(topic2, "B", "b");
+            driver.setTime(2L);
+            driver.process(topic2, "C", "c");
+            driver.setTime(3L);
+            driver.process(topic2, "D", "d");
+            driver.setTime(4L);
+            driver.process(topic2, "A", "a");
+
+            proc1.checkAndClearResult();
+            proc2.checkAndClearResult(
+                    "[A@0]:0+a",
+                    "[B@0]:0+b",
+                    "[C@0]:0+c",
+                    "[D@0]:0+d",
+                    "[A@0]:0+a+a"
+            );
+            proc3.checkAndClearResult(
+                    "[A@0]:0+1+1+1%0+a",
+                    "[B@0]:0+2+2+2%0+b",
+                    "[C@0]:0+3+3%0+c",
+                    "[D@0]:0+4+4%0+d",
+                    "[A@0]:0+1+1+1%0+a+a");
+
+            driver.setTime(5L);
+            driver.process(topic2, "A", "a");
+            driver.setTime(6L);
+            driver.process(topic2, "B", "b");
+            driver.setTime(7L);
+            driver.process(topic2, "D", "d");
+            driver.setTime(8L);
+            driver.process(topic2, "B", "b");
+            driver.setTime(9L);
+            driver.process(topic2, "C", "c");
+
+            proc1.checkAndClearResult();
+            proc2.checkAndClearResult(
+                    "[A@0]:0+a+a+a", "[A@5]:0+a",
+                    "[B@0]:0+b+b", "[B@5]:0+b",
+                    "[D@0]:0+d+d", "[D@5]:0+d",
+                    "[B@0]:0+b+b+b", "[B@5]:0+b+b",
+                    "[C@0]:0+c+c", "[C@5]:0+c"
+            );
+            proc3.checkAndClearResult(
+                    "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
+                    "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
+                    "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
+                    "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
+                    "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
+            );
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
 }


Mime
View raw message