kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3192: Add unwindowed aggregations for KStream; and make all example code executable
Date Mon, 29 Feb 2016 22:03:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a7312971a -> 845c6eae1


http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 785d3e8..be5c728 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -430,6 +430,32 @@ public class TopologyBuilder {
     }
 
     /**
+     * Connects a list of processors.
+     *
+     * NOTE this function would not needed by developers working with the processor APIs,
but only used
+     * for the high-level DSL parsing functionalities.
+     *
+     * @param processorNames the name of the processors
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder connectProcessors(String... processorNames) {
+        if (processorNames.length < 2)
+            throw new TopologyBuilderException("At least two processors need to participate
in the connection.");
+
+        for (String processorName : processorNames) {
+            if (!nodeFactories.containsKey(processorName))
+                throw new TopologyBuilderException("Processor " + processorName + " is not
added yet.");
+
+        }
+
+        String firstProcessorName = processorNames[0];
+
+        nodeGrouper.unite(firstProcessorName, Arrays.copyOfRange(processorNames, 1, processorNames.length));
+
+        return this;
+    }
+
+    /**
      * Adds an internal topic
      *
      * @param topicName the name of the topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 0787204..62bf307 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -87,6 +87,14 @@ public class RecordQueue {
             timeTracker.addElement(stampedRecord);
         }
 
+        // update the partition timestamp if its currently
+        // tracked min timestamp has exceed its value; this will
+        // usually only take effect for the first added batch
+        long timestamp = timeTracker.get();
+
+        if (timestamp > partitionTime)
+            partitionTime = timestamp;
+
         return size();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 440efc8..1cc5287 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -118,7 +118,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
             internalTopicManager = new InternalTopicManager(
                     (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
-                    (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
+                    configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer)
configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7392d9e..e35eb89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -279,7 +279,6 @@ public class StreamThread extends Thread {
         log.info("Shutting down stream thread [" + this.getName() + "]");
 
         // Exceptions should not prevent this call from going through all shutdown steps
-
         try {
             commitAll();
         } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
deleted file mode 100644
index 93c5df6..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import java.io.File;
-import java.nio.file.Files;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamAggregateTest {
-
-    private final Serializer<String> strSerializer = new StringSerializer();
-    private final Deserializer<String> strDeserializer = new StringDeserializer();
-
-    private class StringAdd implements Aggregator<String, String, String> {
-
-        @Override
-        public String apply(String aggKey, String value, String aggregate) {
-            return aggregate + "+" + value;
-        }
-    }
-
-    private class StringInit implements Initializer<String> {
-
-        @Override
-        public String apply() {
-            return "0";
-        }
-    }
-
-    @Test
-    public void testAggBasic() throws Exception {
-        final File baseDir = Files.createTempDirectory("test").toFile();
-
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-            String topic1 = "topic1";
-
-            KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer,
topic1);
-            KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new
StringInit(), new StringAdd(),
-                    HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
-                    strSerializer,
-                    strSerializer,
-                    strDeserializer,
-                    strDeserializer);
-
-            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
-            table2.toStream().process(proc2);
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-
-            driver.setTime(0L);
-            driver.process(topic1, "A", "1");
-            driver.setTime(1L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(2L);
-            driver.process(topic1, "C", "3");
-            driver.setTime(3L);
-            driver.process(topic1, "D", "4");
-            driver.setTime(4L);
-            driver.process(topic1, "A", "1");
-
-            driver.setTime(5L);
-            driver.process(topic1, "A", "1");
-            driver.setTime(6L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(7L);
-            driver.process(topic1, "D", "4");
-            driver.setTime(8L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(9L);
-            driver.process(topic1, "C", "3");
-
-            driver.setTime(10L);
-            driver.process(topic1, "A", "1");
-            driver.setTime(11L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(12L);
-            driver.process(topic1, "D", "4");
-            driver.setTime(13L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(14L);
-            driver.process(topic1, "C", "3");
-
-            assertEquals(Utils.mkList(
-                    "[A@0]:0+1",
-                    "[B@0]:0+2",
-                    "[C@0]:0+3",
-                    "[D@0]:0+4",
-                    "[A@0]:0+1+1",
-
-                    "[A@0]:0+1+1+1", "[A@5]:0+1",
-                    "[B@0]:0+2+2", "[B@5]:0+2",
-                    "[D@0]:0+4+4", "[D@5]:0+4",
-                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
-                    "[C@0]:0+3+3", "[C@5]:0+3",
-
-                    "[A@5]:0+1+1", "[A@10]:0+1",
-                    "[B@5]:0+2+2+2", "[B@10]:0+2",
-                    "[D@5]:0+4+4", "[D@10]:0+4",
-                    "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
-                    "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testJoin() throws Exception {
-        final File baseDir = Files.createTempDirectory("test").toFile();
-
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-            String topic1 = "topic1";
-            String topic2 = "topic2";
-
-            KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer,
topic1);
-            KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new
StringInit(), new StringAdd(),
-                    HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
-                    strSerializer,
-                    strSerializer,
-                    strDeserializer,
-                    strDeserializer);
-
-            MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
-            table1.toStream().process(proc1);
-
-            KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer,
topic2);
-            KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new
StringInit(), new StringAdd(),
-                    HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
-                    strSerializer,
-                    strSerializer,
-                    strDeserializer,
-                    strDeserializer);
-
-            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
-            table2.toStream().process(proc2);
-
-
-            MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
-            table1.join(table2, new ValueJoiner<String, String, String>() {
-                @Override
-                public String apply(String p1, String p2) {
-                    return p1 + "%" + p2;
-                }
-            }).toStream().process(proc3);
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-
-            driver.setTime(0L);
-            driver.process(topic1, "A", "1");
-            driver.setTime(1L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(2L);
-            driver.process(topic1, "C", "3");
-            driver.setTime(3L);
-            driver.process(topic1, "D", "4");
-            driver.setTime(4L);
-            driver.process(topic1, "A", "1");
-
-            proc1.checkAndClearResult(
-                    "[A@0]:0+1",
-                    "[B@0]:0+2",
-                    "[C@0]:0+3",
-                    "[D@0]:0+4",
-                    "[A@0]:0+1+1"
-            );
-            proc2.checkAndClearResult();
-            proc3.checkAndClearResult(
-                    "[A@0]:null",
-                    "[B@0]:null",
-                    "[C@0]:null",
-                    "[D@0]:null",
-                    "[A@0]:null"
-            );
-
-            driver.setTime(5L);
-            driver.process(topic1, "A", "1");
-            driver.setTime(6L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(7L);
-            driver.process(topic1, "D", "4");
-            driver.setTime(8L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(9L);
-            driver.process(topic1, "C", "3");
-
-            proc1.checkAndClearResult(
-                    "[A@0]:0+1+1+1", "[A@5]:0+1",
-                    "[B@0]:0+2+2", "[B@5]:0+2",
-                    "[D@0]:0+4+4", "[D@5]:0+4",
-                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
-                    "[C@0]:0+3+3", "[C@5]:0+3"
-            );
-            proc2.checkAndClearResult();
-            proc3.checkAndClearResult(
-                    "[A@0]:null", "[A@5]:null",
-                    "[B@0]:null", "[B@5]:null",
-                    "[D@0]:null", "[D@5]:null",
-                    "[B@0]:null", "[B@5]:null",
-                    "[C@0]:null", "[C@5]:null"
-            );
-
-            driver.setTime(0L);
-            driver.process(topic2, "A", "a");
-            driver.setTime(1L);
-            driver.process(topic2, "B", "b");
-            driver.setTime(2L);
-            driver.process(topic2, "C", "c");
-            driver.setTime(3L);
-            driver.process(topic2, "D", "d");
-            driver.setTime(4L);
-            driver.process(topic2, "A", "a");
-
-            proc1.checkAndClearResult();
-            proc2.checkAndClearResult(
-                    "[A@0]:0+a",
-                    "[B@0]:0+b",
-                    "[C@0]:0+c",
-                    "[D@0]:0+d",
-                    "[A@0]:0+a+a"
-            );
-            proc3.checkAndClearResult(
-                    "[A@0]:0+1+1+1%0+a",
-                    "[B@0]:0+2+2+2%0+b",
-                    "[C@0]:0+3+3%0+c",
-                    "[D@0]:0+4+4%0+d",
-                    "[A@0]:0+1+1+1%0+a+a");
-
-            driver.setTime(5L);
-            driver.process(topic2, "A", "a");
-            driver.setTime(6L);
-            driver.process(topic2, "B", "b");
-            driver.setTime(7L);
-            driver.process(topic2, "D", "d");
-            driver.setTime(8L);
-            driver.process(topic2, "B", "b");
-            driver.setTime(9L);
-            driver.process(topic2, "C", "c");
-
-            proc1.checkAndClearResult();
-            proc2.checkAndClearResult(
-                    "[A@0]:0+a+a+a", "[A@5]:0+a",
-                    "[B@0]:0+b+b", "[B@5]:0+b",
-                    "[D@0]:0+d+d", "[D@5]:0+d",
-                    "[B@0]:0+b+b+b", "[B@5]:0+b+b",
-                    "[C@0]:0+c+c", "[C@5]:0+c"
-            );
-            proc3.checkAndClearResult(
-                    "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
-                    "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
-                    "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
-                    "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
-                    "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
-            );
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
new file mode 100644
index 0000000..9e0745a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamWindowAggregateTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+    private class StringAdd implements Aggregator<String, String, String> {
+
+        @Override
+        public String apply(String aggKey, String value, String aggregate) {
+            return aggregate + "+" + value;
+        }
+    }
+
+    private class StringInit implements Initializer<String> {
+
+        @Override
+        public String apply() {
+            return "0";
+        }
+    }
+
+    @Test
+    public void testAggBasic() throws Exception {
+        final File baseDir = Files.createTempDirectory("test").toFile();
+
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+            String topic1 = "topic1";
+
+            KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer,
topic1);
+            KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new
StringInit(), new StringAdd(),
+                    HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
+                    strSerializer,
+                    strSerializer,
+                    strDeserializer,
+                    strDeserializer);
+
+            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+            table2.toStream().process(proc2);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+            driver.setTime(0L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(1L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(2L);
+            driver.process(topic1, "C", "3");
+            driver.setTime(3L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(4L);
+            driver.process(topic1, "A", "1");
+
+            driver.setTime(5L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(6L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(7L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(8L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(9L);
+            driver.process(topic1, "C", "3");
+
+            driver.setTime(10L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(11L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(12L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(13L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(14L);
+            driver.process(topic1, "C", "3");
+
+            assertEquals(Utils.mkList(
+                    "[A@0]:0+1",
+                    "[B@0]:0+2",
+                    "[C@0]:0+3",
+                    "[D@0]:0+4",
+                    "[A@0]:0+1+1",
+
+                    "[A@0]:0+1+1+1", "[A@5]:0+1",
+                    "[B@0]:0+2+2", "[B@5]:0+2",
+                    "[D@0]:0+4+4", "[D@5]:0+4",
+                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                    "[C@0]:0+3+3", "[C@5]:0+3",
+
+                    "[A@5]:0+1+1", "[A@10]:0+1",
+                    "[B@5]:0+2+2+2", "[B@10]:0+2",
+                    "[D@5]:0+4+4", "[D@10]:0+4",
+                    "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
+                    "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testJoin() throws Exception {
+        final File baseDir = Files.createTempDirectory("test").toFile();
+
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+            String topic1 = "topic1";
+            String topic2 = "topic2";
+
+            KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer,
topic1);
+            KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new
StringInit(), new StringAdd(),
+                    HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
+                    strSerializer,
+                    strSerializer,
+                    strDeserializer,
+                    strDeserializer);
+
+            MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
+            table1.toStream().process(proc1);
+
+            KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer,
topic2);
+            KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new
StringInit(), new StringAdd(),
+                    HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
+                    strSerializer,
+                    strSerializer,
+                    strDeserializer,
+                    strDeserializer);
+
+            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+            table2.toStream().process(proc2);
+
+
+            MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
+            table1.join(table2, new ValueJoiner<String, String, String>() {
+                @Override
+                public String apply(String p1, String p2) {
+                    return p1 + "%" + p2;
+                }
+            }).toStream().process(proc3);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+            driver.setTime(0L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(1L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(2L);
+            driver.process(topic1, "C", "3");
+            driver.setTime(3L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(4L);
+            driver.process(topic1, "A", "1");
+
+            proc1.checkAndClearResult(
+                    "[A@0]:0+1",
+                    "[B@0]:0+2",
+                    "[C@0]:0+3",
+                    "[D@0]:0+4",
+                    "[A@0]:0+1+1"
+            );
+            proc2.checkAndClearResult();
+            proc3.checkAndClearResult(
+                    "[A@0]:null",
+                    "[B@0]:null",
+                    "[C@0]:null",
+                    "[D@0]:null",
+                    "[A@0]:null"
+            );
+
+            driver.setTime(5L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(6L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(7L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(8L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(9L);
+            driver.process(topic1, "C", "3");
+
+            proc1.checkAndClearResult(
+                    "[A@0]:0+1+1+1", "[A@5]:0+1",
+                    "[B@0]:0+2+2", "[B@5]:0+2",
+                    "[D@0]:0+4+4", "[D@5]:0+4",
+                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                    "[C@0]:0+3+3", "[C@5]:0+3"
+            );
+            proc2.checkAndClearResult();
+            proc3.checkAndClearResult(
+                    "[A@0]:null", "[A@5]:null",
+                    "[B@0]:null", "[B@5]:null",
+                    "[D@0]:null", "[D@5]:null",
+                    "[B@0]:null", "[B@5]:null",
+                    "[C@0]:null", "[C@5]:null"
+            );
+
+            driver.setTime(0L);
+            driver.process(topic2, "A", "a");
+            driver.setTime(1L);
+            driver.process(topic2, "B", "b");
+            driver.setTime(2L);
+            driver.process(topic2, "C", "c");
+            driver.setTime(3L);
+            driver.process(topic2, "D", "d");
+            driver.setTime(4L);
+            driver.process(topic2, "A", "a");
+
+            proc1.checkAndClearResult();
+            proc2.checkAndClearResult(
+                    "[A@0]:0+a",
+                    "[B@0]:0+b",
+                    "[C@0]:0+c",
+                    "[D@0]:0+d",
+                    "[A@0]:0+a+a"
+            );
+            proc3.checkAndClearResult(
+                    "[A@0]:0+1+1+1%0+a",
+                    "[B@0]:0+2+2+2%0+b",
+                    "[C@0]:0+3+3%0+c",
+                    "[D@0]:0+4+4%0+d",
+                    "[A@0]:0+1+1+1%0+a+a");
+
+            driver.setTime(5L);
+            driver.process(topic2, "A", "a");
+            driver.setTime(6L);
+            driver.process(topic2, "B", "b");
+            driver.setTime(7L);
+            driver.process(topic2, "D", "d");
+            driver.setTime(8L);
+            driver.process(topic2, "B", "b");
+            driver.setTime(9L);
+            driver.process(topic2, "C", "c");
+
+            proc1.checkAndClearResult();
+            proc2.checkAndClearResult(
+                    "[A@0]:0+a+a+a", "[A@5]:0+a",
+                    "[B@0]:0+b+b", "[B@5]:0+b",
+                    "[D@0]:0+d+d", "[D@5]:0+d",
+                    "[B@0]:0+b+b+b", "[B@5]:0+b+b",
+                    "[C@0]:0+c+c", "[C@5]:0+c"
+            );
+            proc3.checkAndClearResult(
+                    "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
+                    "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
+                    "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
+                    "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
+                    "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
+            );
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 59711db..ec85ed7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -65,6 +65,7 @@ public class KTableAggregateTest {
         }
     }
 
+
     @Test
     public void testAggBasic() throws Exception {
         final File baseDir = Files.createTempDirectory("test").toFile();

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 9d0c0e2..61f6dbf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -77,7 +77,7 @@ public class PartitionGroupTest {
         assertEquals(6, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(3, group.numBuffered(partition2));
-        assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp());
+        assertEquals(1L, group.timestamp());
 
         StampedRecord record;
         PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
@@ -89,7 +89,7 @@ public class PartitionGroupTest {
         assertEquals(5, group.numBuffered());
         assertEquals(2, group.numBuffered(partition1));
         assertEquals(3, group.numBuffered(partition2));
-        assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp());
+        assertEquals(2L, group.timestamp());
 
         // get one record, now the time should be advanced
         record = group.nextRecord(info);

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 614e2c7..36f38e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -58,7 +58,7 @@ public class RecordQueueTest {
         queue.addRawRecords(list1, timestampExtractor);
 
         assertEquals(3, queue.size());
-        assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
+        assertEquals(1L, queue.timestamp());
 
         // poll the first record, now with 1, 3
         assertEquals(2L, queue.poll().timestamp);
@@ -107,7 +107,7 @@ public class RecordQueueTest {
         queue.addRawRecords(list3, timestampExtractor);
 
         assertEquals(3, queue.size());
-        assertEquals(3L, queue.timestamp());
+        assertEquals(4L, queue.timestamp());
 
         // poll one record again, the timestamp should advance now
         assertEquals(4L, queue.poll().timestamp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 94f0ce3..0430566 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -124,8 +124,8 @@ public class StreamTaskTest {
             assertEquals(0, source2.numReceived);
 
             assertEquals(4, task.process());
-            assertEquals(1, source1.numReceived);
-            assertEquals(1, source2.numReceived);
+            assertEquals(2, source1.numReceived);
+            assertEquals(0, source2.numReceived);
 
             assertEquals(3, task.process());
             assertEquals(2, source1.numReceived);
@@ -188,14 +188,21 @@ public class StreamTaskTest {
             assertTrue(consumer.paused().contains(partition2));
 
             assertEquals(7, task.process());
-            assertEquals(1, source1.numReceived);
-            assertEquals(1, source2.numReceived);
+            assertEquals(2, source1.numReceived);
+            assertEquals(0, source2.numReceived);
 
             assertEquals(1, consumer.paused().size());
-            assertTrue(consumer.paused().contains(partition1));
+            assertTrue(consumer.paused().contains(partition2));
 
             assertEquals(6, task.process());
-            assertEquals(2, source1.numReceived);
+            assertEquals(3, source1.numReceived);
+            assertEquals(0, source2.numReceived);
+
+            assertEquals(1, consumer.paused().size());
+            assertTrue(consumer.paused().contains(partition2));
+
+            assertEquals(5, task.process());
+            assertEquals(3, source1.numReceived);
             assertEquals(1, source2.numReceived);
 
             assertEquals(0, consumer.paused().size());


Mime
View raw message