kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/5] kafka git commit: KAFKA-5045: Clarify on KTable APIs for queryable stores
Date Wed, 03 May 2017 23:15:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a3952aee4 -> ec9e4eafa


http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 7ce0b54..6fc6bd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -42,6 +43,7 @@ import java.util.regex.Pattern;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
 
 public class KStreamBuilderTest {
 
@@ -85,6 +87,19 @@ public class KStreamBuilderTest {
     }
 
     @Test
+    public void testNewStoreName() {
+        assertEquals("X-STATE-STORE-0000000000", builder.newStoreName("X-"));
+        assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
+        assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));
+
+        KStreamBuilder newBuilder = new KStreamBuilder();
+
+        assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-"));
+        assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-"));
+        assertEquals("Z-STATE-STORE-0000000002", newBuilder.newStoreName("Z-"));
+    }
+
+    @Test
     public void testMerge() {
         String topic1 = "topic-1";
         String topic2 = "topic-2";
@@ -151,16 +166,22 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void shouldNotMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
-        builder.table("topic1", "table1");
-        builder.table("topic2", null);
+    public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
+        KTable table1 = builder.table("topic1", "table1");
+        KTable table2 = builder.table("topic2", (String) null);
 
         final ProcessorTopology topology = builder.build(null);
 
-        assertEquals(1, topology.stateStores().size());
+        assertEquals(2, topology.stateStores().size());
         assertEquals("table1", topology.stateStores().get(0).name());
-        assertEquals(1, topology.storeToChangelogTopic().size());
+
+        final String internalStoreName = topology.stateStores().get(1).name();
+        assertTrue(internalStoreName.contains(KTableImpl.STATE_STORE_NAME));
+        assertEquals(2, topology.storeToChangelogTopic().size());
         assertEquals("topic1", topology.storeToChangelogTopic().get("table1"));
+        assertEquals("topic2", topology.storeToChangelogTopic().get(internalStoreName));
+        assertEquals(table1.queryableStoreName(), "table1");
+        assertNull(table2.queryableStoreName());
     }
 
     @Test
@@ -174,11 +195,7 @@ public class KStreamBuilderTest {
         assertEquals("globalTable", stateStores.get(0).name());
     }
 
-    @Test
-    public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
-        builder.globalTable("table", "globalTable");
-        builder.globalTable("table2", "globalTable2");
-
+    private void doBuildGlobalTopologyWithAllGlobalTables() throws Exception {
         final ProcessorTopology topology = builder.buildGlobalStateTopology();
 
         final List<StateStore> stateStores = topology.globalStateStores();
@@ -189,6 +206,22 @@ public class KStreamBuilderTest {
     }
 
     @Test
+    public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+        builder.globalTable("table", "globalTable");
+        builder.globalTable("table2", "globalTable2");
+
+        doBuildGlobalTopologyWithAllGlobalTables();
+    }
+
+    @Test
+    public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception {
+        builder.globalTable("table");
+        builder.globalTable("table2");
+
+        doBuildGlobalTopologyWithAllGlobalTables();
+    }
+
+    @Test
     public void shouldAddGlobalTablesToEachGroup() throws Exception {
         final String one = "globalTable";
         final String two = "globalTable2";
@@ -324,4 +357,4 @@ public class KStreamBuilderTest {
         assertTrue(builder.latestResetTopicsPattern().matcher(topicTwo).matches());
         assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 8d1a789..f21c7d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -34,6 +35,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
@@ -52,6 +54,7 @@ import java.util.Map;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class KGroupedStreamImplTest {
 
@@ -80,8 +83,8 @@ public class KGroupedStreamImplTest {
         groupedStream.reduce(null, "store");
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullStoreNameOnReduce() throws Exception {
+    @Test
+    public void shouldAllowNullStoreNameOnReduce() throws Exception {
         groupedStream.reduce(MockReducer.STRING_ADDER, (String) null);
     }
 
@@ -96,6 +99,17 @@ public class KGroupedStreamImplTest {
     }
 
     @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreSupplierOnCount() throws Exception {
+        groupedStream.count((StateStoreSupplier<KeyValueStore>) null);
+    }
+
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreSupplierOnWindowedCount() throws Exception {
+        groupedStream.count(TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
+    }
+
+    @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullReducerWithWindowedReduce() throws Exception {
         groupedStream.reduce(null, TimeWindows.of(10), "store");
     }
@@ -105,8 +119,8 @@ public class KGroupedStreamImplTest {
         groupedStream.reduce(MockReducer.STRING_ADDER, (Windows) null, "store");
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullStoreNameWithWindowedReduce() throws Exception {
+    @Test
+    public void shouldAllowNullStoreNameWithWindowedReduce() throws Exception {
         groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), (String) null);
     }
 
@@ -125,8 +139,8 @@ public class KGroupedStreamImplTest {
         groupedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String(), "store");
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullStoreNameOnAggregate() throws Exception {
+    @Test
+    public void shouldAllowNullStoreNameOnAggregate() throws Exception {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), null);
     }
 
@@ -150,8 +164,8 @@ public class KGroupedStreamImplTest {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Serdes.String(), "store");
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullStoreNameOnWindowedAggregate() throws Exception {
+    @Test
+    public void shouldAllowNullStoreNameOnWindowedAggregate() throws Exception {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), null);
     }
 
@@ -162,13 +176,33 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), null);
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
+    }
+
+    private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) throws Exception {
+        driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
+        driver.setTime(10);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(15);
+        driver.process(TOPIC, "2", "2");
+        driver.setTime(30);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(70);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(90);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(100);
+        driver.process(TOPIC, "1", "1");
+        driver.flushState();
+        assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
+        assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
+        assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
     }
 
     @Test
     public void shouldAggregateSessionWindows() throws Exception {
         final Map<Windowed<String>, Integer> results = new HashMap<>();
-        groupedStream.aggregate(new Initializer<Integer>() {
+        KTable table = groupedStream.aggregate(new Initializer<Integer>() {
             @Override
             public Integer apply() {
                 return 0;
@@ -183,43 +217,49 @@ public class KGroupedStreamImplTest {
             public Integer apply(final String aggKey, final Integer aggOne, final Integer aggTwo) {
                 return aggOne + aggTwo;
             }
-        }, SessionWindows.with(30), Serdes.Integer(), "session-store")
-                .foreach(new ForeachAction<Windowed<String>, Integer>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final Integer value) {
-                        results.put(key, value);
-                    }
-                });
+        }, SessionWindows.with(30), Serdes.Integer(), "session-store");
+        table.foreach(new ForeachAction<Windowed<String>, Integer>() {
+            @Override
+            public void apply(final Windowed<String> key, final Integer value) {
+                results.put(key, value);
+            }
+        });
 
-        driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
-        driver.setTime(10);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(15);
-        driver.process(TOPIC, "2", "2");
-        driver.setTime(30);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(70);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(90);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(100);
-        driver.process(TOPIC, "1", "1");
-        driver.flushState();
-        assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
-        assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
-        assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
+        doAggregateSessionWindows(results);
+        assertEquals(table.queryableStoreName(), "session-store");
     }
 
     @Test
-    public void shouldCountSessionWindows() throws Exception {
-        final Map<Windowed<String>, Long> results = new HashMap<>();
-        groupedStream.count(SessionWindows.with(30), "session-store")
-                .foreach(new ForeachAction<Windowed<String>, Long>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final Long value) {
-                        results.put(key, value);
-                    }
-                });
+    public void shouldAggregateSessionWindowsWithInternalStoreName() throws Exception {
+        final Map<Windowed<String>, Integer> results = new HashMap<>();
+        KTable table = groupedStream.aggregate(new Initializer<Integer>() {
+            @Override
+            public Integer apply() {
+                return 0;
+            }
+        }, new Aggregator<String, String, Integer>() {
+            @Override
+            public Integer apply(final String aggKey, final String value, final Integer aggregate) {
+                return aggregate + 1;
+            }
+        }, new Merger<String, Integer>() {
+            @Override
+            public Integer apply(final String aggKey, final Integer aggOne, final Integer aggTwo) {
+                return aggOne + aggTwo;
+            }
+        }, SessionWindows.with(30), Serdes.Integer());
+        table.foreach(new ForeachAction<Windowed<String>, Integer>() {
+            @Override
+            public void apply(final Windowed<String> key, final Integer value) {
+                results.put(key, value);
+            }
+        });
+
+        doAggregateSessionWindows(results);
+        assertNull(table.queryableStoreName());
+    }
+
+    private void doCountSessionWindows(final Map<Windowed<String>, Long> results) throws Exception {
         driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
         driver.setTime(10);
         driver.process(TOPIC, "1", "1");
@@ -240,22 +280,34 @@ public class KGroupedStreamImplTest {
     }
 
     @Test
-    public void shouldReduceSessionWindows() throws Exception {
-        final Map<Windowed<String>, String> results = new HashMap<>();
-        groupedStream.reduce(
-                new Reducer<String>() {
-                    @Override
-                    public String apply(final String value1, final String value2) {
-                        return value1 + ":" + value2;
-                    }
-                }, SessionWindows.with(30),
-                "session-store")
-                .foreach(new ForeachAction<Windowed<String>, String>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final String value) {
-                        results.put(key, value);
-                    }
-                });
+    public void shouldCountSessionWindows() throws Exception {
+        final Map<Windowed<String>, Long> results = new HashMap<>();
+        KTable table = groupedStream.count(SessionWindows.with(30), "session-store");
+        table.foreach(new ForeachAction<Windowed<String>, Long>() {
+            @Override
+            public void apply(final Windowed<String> key, final Long value) {
+                results.put(key, value);
+            }
+        });
+        doCountSessionWindows(results);
+        assertEquals(table.queryableStoreName(), "session-store");
+    }
+
+    @Test
+    public void shouldCountSessionWindowsWithInternalStoreName() throws Exception {
+        final Map<Windowed<String>, Long> results = new HashMap<>();
+        KTable table = groupedStream.count(SessionWindows.with(30));
+        table.foreach(new ForeachAction<Windowed<String>, Long>() {
+            @Override
+            public void apply(final Windowed<String> key, final Long value) {
+                results.put(key, value);
+            }
+        });
+        doCountSessionWindows(results);
+        assertNull(table.queryableStoreName());
+    }
+
+    private void doReduceSessionWindows(final Map<Windowed<String>, String> results) throws Exception {
         driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
         driver.setTime(10);
         driver.process(TOPIC, "1", "A");
@@ -275,6 +327,48 @@ public class KGroupedStreamImplTest {
         assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
     }
 
+    @Test
+    public void shouldReduceSessionWindows() throws Exception {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        KTable table = groupedStream.reduce(
+                new Reducer<String>() {
+                    @Override
+                    public String apply(final String value1, final String value2) {
+                        return value1 + ":" + value2;
+                    }
+                }, SessionWindows.with(30),
+                "session-store");
+        table.foreach(new ForeachAction<Windowed<String>, String>() {
+            @Override
+            public void apply(final Windowed<String> key, final String value) {
+                results.put(key, value);
+            }
+        });
+        doReduceSessionWindows(results);
+        assertEquals(table.queryableStoreName(), "session-store");
+    }
+
+    @Test
+    public void shouldReduceSessionWindowsWithInternalStoreName() throws Exception {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        KTable table = groupedStream.reduce(
+                new Reducer<String>() {
+                    @Override
+                    public String apply(final String value1, final String value2) {
+                        return value1 + ":" + value2;
+                    }
+                }, SessionWindows.with(30));
+        table.foreach(new ForeachAction<Windowed<String>, String>() {
+            @Override
+            public void apply(final Windowed<String> key, final String value) {
+                results.put(key, value);
+            }
+        });
+        doReduceSessionWindows(results);
+        assertNull(table.queryableStoreName());
+    }
+
+
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullReducerWhenReducingSessionWindows() throws Exception {
         groupedStream.reduce(null, SessionWindows.with(10), "store");
@@ -285,8 +379,8 @@ public class KGroupedStreamImplTest {
         groupedStream.reduce(MockReducer.STRING_ADDER, (SessionWindows) null, "store");
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullStoreNameWhenReducingSessionWindows() throws Exception {
+    @Test
+    public void shouldAcceptNullStoreNameWhenReducingSessionWindows() throws Exception {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (String) null);
     }
 
@@ -323,11 +417,11 @@ public class KGroupedStreamImplTest {
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() throws Exception {
         groupedStream.aggregate(MockInitializer.STRING_INIT,
-                                MockAggregator.TOSTRING_ADDER,
-                                null,
-                                SessionWindows.with(10),
-                                Serdes.String(),
-                                "storeName");
+                MockAggregator.TOSTRING_ADDER,
+                null,
+                SessionWindows.with(10),
+                Serdes.String(),
+                "storeName");
     }
 
     @Test(expected = NullPointerException.class)
@@ -340,8 +434,8 @@ public class KGroupedStreamImplTest {
         }, null, Serdes.String(), "storeName");
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullStoreNameWhenAggregatingSessionWindows() throws Exception {
+    @Test
+    public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() throws Exception {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -375,8 +469,8 @@ public class KGroupedStreamImplTest {
         groupedStream.count((SessionWindows) null, "store");
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullStoreNameWhenCountingSessionWindows() throws Exception {
+    @Test
+    public void shouldAcceptNullStoreNameWhenCountingSessionWindows() throws Exception {
         groupedStream.count(SessionWindows.with(90), (String) null);
     }
 
@@ -390,19 +484,7 @@ public class KGroupedStreamImplTest {
         groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null);
     }
 
-    @Test
-    public void shouldCountWindowed() throws Exception {
-        final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
-        groupedStream.count(
-                TimeWindows.of(500L),
-                "aggregate-by-key-windowed")
-                .foreach(new ForeachAction<Windowed<String>, Long>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final Long value) {
-                        results.add(KeyValue.pair(key, value));
-                    }
-                });
-
+    private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) throws Exception {
         driver = new KStreamTestDriver(builder, TestUtils.tempDirectory(), 0);
         driver.setTime(0);
         driver.process(TOPIC, "1", "A");
@@ -423,4 +505,35 @@ public class KGroupedStreamImplTest {
                 KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)
         )));
     }
+
+    @Test
+    public void shouldCountWindowed() throws Exception {
+        final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
+        groupedStream.count(
+                TimeWindows.of(500L),
+                "aggregate-by-key-windowed")
+                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Long value) {
+                        results.add(KeyValue.pair(key, value));
+                    }
+                });
+
+        doCountWindowed(results);
+    }
+
+    @Test
+    public void shouldCountWindowedWithInternalStoreName() throws Exception {
+        final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
+        groupedStream.count(
+                TimeWindows.of(500L))
+                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Long value) {
+                        results.add(KeyValue.pair(key, value));
+                    }
+                });
+
+        doCountWindowed(results);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 4934204..1e49b22 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
@@ -38,6 +40,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 
 public class KGroupedTableImplTest {
@@ -61,8 +64,13 @@ public class KGroupedTableImplTest {
         driver = null;
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullStoreNameOnAggregate() throws Exception {
+    @Test
+    public void shouldAllowNullStoreNameOnCount()  {
+        groupedTable.count((String) null);
+    }
+
+    @Test
+    public void shouldAllowNullStoreNameOnAggregate() throws Exception {
         groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, (String) null);
     }
 
@@ -96,8 +104,8 @@ public class KGroupedTableImplTest {
         groupedTable.reduce(MockReducer.STRING_ADDER, null, "store");
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullStoreNameOnReduce() throws Exception {
+    @Test
+    public void shouldAllowNullStoreNameOnReduce() throws Exception {
         groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (String) null);
     }
 
@@ -108,24 +116,10 @@ public class KGroupedTableImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreSupplierOnReduce() throws Exception {
-        groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (String) null);
+        groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (StateStoreSupplier<KeyValueStore>) null);
     }
 
-    @Test
-    public void shouldReduce() throws Exception {
-        final String topic = "input";
-        final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
-            new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
-                @Override
-                public KeyValue<String, Integer> apply(String key, Number value) {
-                    return KeyValue.pair(key, value.intValue());
-                }
-            };
-
-        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
-            .groupBy(intProjection)
-            .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
-
+    private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) throws Exception {
         final Map<String, Integer> results = new HashMap<>();
         reduced.foreach(new ForeachAction<String, Integer>() {
             @Override
@@ -152,4 +146,42 @@ public class KGroupedTableImplTest {
         assertEquals(Integer.valueOf(5), results.get("A"));
         assertEquals(Integer.valueOf(6), results.get("B"));
     }
+
+    @Test
+    public void shouldReduce() throws Exception {
+        final String topic = "input";
+        final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
+            new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
+                @Override
+                public KeyValue<String, Integer> apply(String key, Number value) {
+                    return KeyValue.pair(key, value.intValue());
+                }
+            };
+
+        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+            .groupBy(intProjection)
+            .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
+
+        doShouldReduce(reduced, topic);
+        assertEquals(reduced.queryableStoreName(), "reduced");
+    }
+
+    @Test
+    public void shouldReduceWithInternalStoreName() throws Exception {
+        final String topic = "input";
+        final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
+            new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
+                @Override
+                public KeyValue<String, Integer> apply(String key, Number value) {
+                    return KeyValue.pair(key, value.intValue());
+                }
+            };
+
+        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+            .groupBy(intProjection)
+            .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
+
+        doShouldReduce(reduced, topic);
+        assertNull(reduced.queryableStoreName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 23325c4..2c37230 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -212,18 +212,7 @@ public class KTableAggregateTest {
                 ), proc.processed);
     }
 
-    @Test
-    public void testCount() throws IOException {
-        final KStreamBuilder builder = new KStreamBuilder();
-        final String input = "count-test-input";
-        final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
-
-        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
-                .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde)
-                .count("count")
-                .toStream()
-                .process(proc);
-
+    private void testCountHelper(final KStreamBuilder builder, final String input, final MockProcessorSupplier<String, Long> proc) throws IOException {
         driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(input, "A", "green");
@@ -240,12 +229,42 @@ public class KTableAggregateTest {
 
 
         assertEquals(Utils.mkList(
-                 "green:1",
-                 "green:2",
-                 "green:1", "blue:1",
-                 "yellow:1",
-                 "green:2"
-                 ), proc.processed);
+            "green:1",
+            "green:2",
+            "green:1", "blue:1",
+            "yellow:1",
+            "green:2"
+        ), proc.processed);
+    }
+
+    @Test
+    public void testCount() throws IOException {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String input = "count-test-input";
+        final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
+
+        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+                .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde)
+                .count("count")
+                .toStream()
+                .process(proc);
+
+        testCountHelper(builder, input, proc);
+    }
+
+    @Test
+    public void testCountWithInternalStore() throws IOException {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String input = "count-test-input";
+        final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
+
+        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+            .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde)
+            .count()
+            .toStream()
+            .process(proc);
+
+        testCountHelper(builder, input, proc);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 3ab4300..c6721f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -57,6 +57,28 @@ public class KTableFilterTest {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
+    private void doTestKTable(final KStreamBuilder builder, final KTable<String, Integer> table2,
+                              final KTable<String, Integer> table3, final String topic1) {
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+        table3.toStream().process(proc3);
+
+        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
+
+        driver.process(topic1, "A", 1);
+        driver.process(topic1, "B", 2);
+        driver.process(topic1, "C", 3);
+        driver.process(topic1, "D", 4);
+        driver.flushState();
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
+        driver.flushState();
+
+        proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
+        proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
+    }
+
     @Test
     public void testKTable() {
         final KStreamBuilder builder = new KStreamBuilder();
@@ -78,53 +100,41 @@ public class KTableFilterTest {
             }
         });
 
-        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
-        MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
-        table3.toStream().process(proc3);
-
-        driver = new KStreamTestDriver(builder, stateDir);
-
-        driver.process(topic1, "A", 1);
-        driver.process(topic1, "B", 2);
-        driver.process(topic1, "C", 3);
-        driver.process(topic1, "D", 4);
-        driver.flushState();
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
-
-        proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
-        proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
+        doTestKTable(builder, table2, table3, topic1);
     }
 
     @Test
-    public void testValueGetter() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+    public void testQueryableKTable() {
+        final KStreamBuilder builder = new KStreamBuilder();
 
         String topic1 = "topic1";
 
-        KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
-        KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(String key, Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
-        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(String key, Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
+        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+
+        KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
+            @Override
+            public boolean test(String key, Integer value) {
+                return (value % 2) == 0;
+            }
+        }, "anyStoreNameFilter");
+        KTable<String, Integer> table3 = table1.filterNot(new Predicate<String, Integer>() {
+            @Override
+            public boolean test(String key, Integer value) {
+                return (value % 2) == 0;
+            }
+        });
+
+        doTestKTable(builder, table2, table3, topic1);
+    }
 
+    private void doTestValueGetter(final KStreamBuilder builder,
+                                   final KTableImpl<String, Integer, Integer> table2,
+                                   final KTableImpl<String, Integer, Integer> table3,
+                                   final String topic1) throws IOException {
         KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
         KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
 
         KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
         KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
@@ -178,7 +188,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testNotSendingOldValue() throws IOException {
+    public void testValueGetter() throws IOException {
         KStreamBuilder builder = new KStreamBuilder();
 
         String topic1 = "topic1";
@@ -192,14 +202,54 @@ public class KTableFilterTest {
                         return (value % 2) == 0;
                     }
                 });
+        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+
+        doTestValueGetter(builder, table2, table3, topic1);
+    }
 
+    @Test
+    public void testQueryableValueGetter() throws IOException {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+            new Predicate<String, Integer>() {
+                @Override
+                public boolean test(String key, Integer value) {
+                    return (value % 2) == 0;
+                }
+            }, "anyStoreNameFilter");
+        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
+            new Predicate<String, Integer>() {
+                @Override
+                public boolean test(String key, Integer value) {
+                    return (value % 2) == 0;
+                }
+            });
+
+        doTestValueGetter(builder, table2, table3, topic1);
+    }
+
+    private void doTestNotSendingOldValue(final KStreamBuilder builder,
+                                          final KTableImpl<String, Integer, Integer> table1,
+                                          final KTableImpl<String, Integer, Integer> table2,
+                                          final String topic1) throws IOException {
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
 
         builder.addProcessor("proc1", proc1, table1.name);
         builder.addProcessor("proc2", proc2, table2.name);
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
 
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 1);
@@ -227,8 +277,9 @@ public class KTableFilterTest {
         proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
     }
 
+
     @Test
-    public void testSendingOldValue() throws IOException {
+    public void testNotSendingOldValue() throws IOException {
         KStreamBuilder builder = new KStreamBuilder();
 
         String topic1 = "topic1";
@@ -243,6 +294,32 @@ public class KTableFilterTest {
                     }
                 });
 
+        doTestNotSendingOldValue(builder, table1, table2, topic1);
+    }
+
+    @Test
+    public void testQueryableNotSendingOldValue() throws IOException {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+            new Predicate<String, Integer>() {
+                @Override
+                public boolean test(String key, Integer value) {
+                    return (value % 2) == 0;
+                }
+            }, "anyStoreNameFilter");
+
+        doTestNotSendingOldValue(builder, table1, table2, topic1);
+    }
+
+    private void doTestSendingOldValue(final KStreamBuilder builder,
+                                       final KTableImpl<String, Integer, Integer> table1,
+                                       final KTableImpl<String, Integer, Integer> table2,
+                                       final String topic1) throws IOException {
         table2.enableSendingOldValues();
 
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
@@ -251,7 +328,7 @@ public class KTableFilterTest {
         builder.addProcessor("proc1", proc1, table1.name);
         builder.addProcessor("proc2", proc2, table2.name);
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());
 
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 1);
@@ -280,23 +357,47 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testSkipNullOnMaterialization() throws IOException {
-        // Do not explicitly set enableSendingOldValues. Let a further downstream stateful operator trigger it instead.
+    public void testSendingOldValue() throws IOException {
         KStreamBuilder builder = new KStreamBuilder();
 
         String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 =
-            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
-        KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
-            new Predicate<String, String>() {
+        KTableImpl<String, Integer, Integer> table1 =
+                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+
+        doTestSendingOldValue(builder, table1, table2, topic1);
+    }
+
+    @Test
+    public void testQueryableSendingOldValue() throws IOException {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+            new Predicate<String, Integer>() {
                 @Override
-                public boolean test(String key, String value) {
-                    return value.equalsIgnoreCase("accept");
+                public boolean test(String key, Integer value) {
+                    return (value % 2) == 0;
                 }
-            }).groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
-            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
+            }, "anyStoreNameFilter");
+
+        doTestSendingOldValue(builder, table1, table2, topic1);
+    }
 
+    private void doTestSkipNullOnMaterialization(final KStreamBuilder builder,
+                                                 final KTableImpl<String, String, String> table1,
+                                                 final KTableImpl<String, String, String> table2,
+                                                 final String topic1) throws IOException {
         MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
         MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
 
@@ -314,6 +415,48 @@ public class KTableFilterTest {
     }
 
     @Test
+    public void testSkipNullOnMaterialization() throws IOException {
+        // Do not explicitly set enableSendingOldValues. Let a further downstream stateful operator trigger it instead.
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTableImpl<String, String, String> table1 =
+            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
+            new Predicate<String, String>() {
+                @Override
+                public boolean test(String key, String value) {
+                    return value.equalsIgnoreCase("accept");
+                }
+            }).groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
+
+        doTestSkipNullOnMaterialization(builder, table1, table2, topic1);
+    }
+
+    @Test
+    public void testQueryableSkipNullOnMaterialization() throws IOException {
+        // Do not explicitly set enableSendingOldValues. Let a further downstream stateful operator trigger it instead.
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTableImpl<String, String, String> table1 =
+            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
+            new Predicate<String, String>() {
+                @Override
+                public boolean test(String key, String value) {
+                    return value.equalsIgnoreCase("accept");
+                }
+            }, "anyStoreNameFilter").groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
+
+        doTestSkipNullOnMaterialization(builder, table1, table2, topic1);
+    }
+
+    @Test
     public void testTypeVariance() throws Exception {
         Predicate<Number, Object> numberKeyPredicate = new Predicate<Number, Object>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 7aef28a..1fbaebf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -25,8 +25,10 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
@@ -419,13 +421,13 @@ public class KTableImplTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTopicInThrough() throws Exception {
-        table.through(null, "store");
+    public void shouldAllowNullTopicInThrough() throws Exception {
+        table.through((String) null, "store");
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullStoreInThrough() throws Exception {
-        table.through("topic", null);
+    @Test
+    public void shouldAllowNullStoreInThrough() throws Exception {
+        table.through("topic", (String) null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -438,6 +440,26 @@ public class KTableImplTest {
         table.join(null, MockValueJoiner.TOSTRING_JOINER);
     }
 
+    @Test
+    public void shouldAllowNullStoreInJoin() throws Exception {
+        table.join(table, MockValueJoiner.TOSTRING_JOINER, null, (String) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStoreSupplierInJoin() throws Exception {
+        table.join(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStoreSupplierInLeftJoin() throws Exception {
+        table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStoreSupplierInOuterJoin() throws Exception {
+        table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>) null);
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerJoin() throws Exception {
         table.join(table, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index e3d3e95..fdc07f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -67,23 +67,10 @@ public class KTableKTableJoinTest {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-    @Test
-    public void testJoin() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
-        final KTable<Integer, String> table1;
-        final KTable<Integer, String> table2;
-        final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
-        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
-        joined.toStream().process(processor);
-
+    private void doTestJoin(final KStreamBuilder builder,
+                            final int[] expectedKeys,
+                            final MockProcessorSupplier<Integer, String> processor,
+                            final KTable<Integer, String> joined) {
         final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
@@ -91,7 +78,7 @@ public class KTableKTableJoinTest {
 
         final KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-        driver = new KStreamTestDriver(builder, stateDir);
+        driver = new KStreamTestDriver(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
         final KTableValueGetter<Integer, String> getter = getterSupplier.get();
@@ -175,7 +162,7 @@ public class KTableKTableJoinTest {
     }
 
     @Test
-    public void testNotSendingOldValues() throws Exception {
+    public void testJoin() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -183,22 +170,60 @@ public class KTableKTableJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> proc;
+        final MockProcessorSupplier<Integer, String> processor;
 
+        processor = new MockProcessorSupplier<>();
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
+        joined.toStream().process(processor);
 
-        proc = new MockProcessorSupplier<>();
-        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        doTestJoin(builder, expectedKeys, processor, joined);
+    }
 
-        driver = new KStreamTestDriver(builder, stateDir);
-        driver.setTime(0L);
 
-        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+    @Test
+    public void testQueryableJoin() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+        final KTable<Integer, String> table1;
+        final KTable<Integer, String> table2;
+        final KTable<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> processor;
+
+        processor = new MockProcessorSupplier<>();
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
+        joined.toStream().process(processor);
+
+        doTestJoin(builder, expectedKeys, processor, joined);
+    }
+
+    private void doTestSendingOldValues(final KStreamBuilder builder,
+                                        final int[] expectedKeys,
+                                        final KTable<Integer, String> table1,
+                                        final KTable<Integer, String> table2,
+                                        final MockProcessorSupplier<Integer, String> proc,
+                                        final KTable<Integer, String> joined,
+                                        final boolean sendOldValues) {
 
+
+        driver = new KStreamTestDriver(builder, stateDir, Serdes.Integer(), Serdes.String());
+        driver.setTime(0L);
+
+        if (!sendOldValues) {
+            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        } else {
+            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        }
         // push two items to the primary stream. the other table is empty
 
         for (int i = 0; i < 2; i++) {
@@ -259,7 +284,7 @@ public class KTableKTableJoinTest {
     }
 
     @Test
-    public void testSendingOldValues() throws Exception {
+    public void testNotSendingOldValues() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -272,73 +297,54 @@ public class KTableKTableJoinTest {
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
-
-        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
-
         proc = new MockProcessorSupplier<>();
         builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-        driver = new KStreamTestDriver(builder, stateDir);
-        driver.setTime(0L);
+        doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
 
-        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-        assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+    }
 
-        // push two items to the primary stream. the other table is empty
+    @Test
+    public void testQueryableNotSendingOldValues() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult();
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        // push two items to the other stream. this should produce two items.
+        final KTable<Integer, String> table1;
+        final KTable<Integer, String> table2;
+        final KTable<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> proc;
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-        // push all four items to the primary stream. this should produce two items.
+        doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
 
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(XX0+Y0<-X0+Y0)", "1:(XX1+Y1<-X1+Y1)");
+    }
 
-        // push all items to the other stream. this should produce four items.
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(XX0+YY0<-XX0+Y0)", "1:(XX1+YY1<-XX1+Y1)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+    @Test
+    public void testSendingOldValues() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
 
-        // push all four items to the primary stream. this should produce four items.
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-XX0+YY0)", "1:(X1+YY1<-XX1+YY1)", "2:(X2+YY2<-XX2+YY2)", "3:(X3+YY3<-XX3+YY3)");
+        final KTable<Integer, String> table1;
+        final KTable<Integer, String> table2;
+        final KTable<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> proc;
 
-        // push two items with null to the other stream as deletes. this should produce two item.
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-        // push all four items to the primary stream. this should produce two items.
+        doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, true);
 
-        for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
     }
 
     private KeyValue<Integer, String> kv(Integer key, String value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 3b590c6..70b1a55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -58,6 +58,17 @@ public class KTableMapValuesTest {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
+    private void doTestKTable(final KStreamBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> proc2) {
+        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.String());
+
+        driver.process(topic1, "A", "1");
+        driver.process(topic1, "B", "2");
+        driver.process(topic1, "C", "3");
+        driver.process(topic1, "D", "4");
+        driver.flushState();
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+    }
+
     @Test
     public void testKTable() {
         final KStreamBuilder builder = new KStreamBuilder();
@@ -75,50 +86,41 @@ public class KTableMapValuesTest {
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
         table2.toStream().process(proc2);
 
-        driver = new KStreamTestDriver(builder, stateDir);
-
-        driver.process(topic1, "A", "1");
-        driver.process(topic1, "B", "2");
-        driver.process(topic1, "C", "3");
-        driver.process(topic1, "D", "4");
-        driver.flushState();
-        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+        doTestKTable(builder, topic1, proc2);
     }
 
     @Test
-    public void testValueGetter() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+    public void testQueryableKTable() {
+        final KStreamBuilder builder = new KStreamBuilder();
 
         String topic1 = "topic1";
-        String topic2 = "topic2";
-        String storeName1 = "storeName1";
-        String storeName2 = "storeName2";
 
-        KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
-        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                new ValueMapper<String, Integer>() {
-                    @Override
-                    public Integer apply(String value) {
-                        return new Integer(value);
-                    }
-                });
-        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(String key, Integer value) {
-                        return (value % 2) == 0;
-                    }
-                });
-        KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                table1.through(stringSerde, stringSerde, topic2, storeName2);
+        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
+            @Override
+            public Integer apply(CharSequence value) {
+                return value.charAt(0) - 48;
+            }
+        }, Serdes.Integer(), "anyName");
+
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
 
+        doTestKTable(builder, topic1, proc2);
+    }
+
+    private void doTestValueGetter(final KStreamBuilder builder,
+                                   final String topic1,
+                                   final KTableImpl<String, String, String> table1,
+                                   final KTableImpl<String, String, Integer> table2,
+                                   final KTableImpl<String, Integer, Integer> table3,
+                                   final KTableImpl<String, String, String> table4) {
         KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
         KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
         KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
         KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
 
-        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.String());
         KTableValueGetter<String, String> getter1 = getterSupplier1.get();
         getter1.init(driver.context());
         KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
@@ -209,6 +211,68 @@ public class KTableMapValuesTest {
     }
 
     @Test
+    public void testValueGetter() throws IOException {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String storeName1 = "storeName1";
+        String storeName2 = "storeName2";
+
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                new ValueMapper<String, Integer>() {
+                    @Override
+                    public Integer apply(String value) {
+                        return new Integer(value);
+                    }
+                });
+        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+        KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+                table1.through(stringSerde, stringSerde, topic2, storeName2);
+
+        doTestValueGetter(builder, topic1, table1, table2, table3, table4);
+    }
+
+    @Test
+    public void testQueryableValueGetter() throws IOException {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String storeName1 = "storeName1";
+        String storeName2 = "storeName2";
+
+        KTableImpl<String, String, String> table1 =
+            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+            new ValueMapper<String, Integer>() {
+                @Override
+                public Integer apply(String value) {
+                    return new Integer(value);
+                }
+            }, Serdes.Integer(), "anyMapName");
+        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+            new Predicate<String, Integer>() {
+                @Override
+                public boolean test(String key, Integer value) {
+                    return (value % 2) == 0;
+                }
+            }, "anyFilterName");
+        KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+            table1.through(stringSerde, stringSerde, topic2, storeName2);
+
+        doTestValueGetter(builder, topic1, table1, table2, table3, table4);
+    }
+
+    @Test
     public void testNotSendingOldValue() throws IOException {
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index be9f7fb..41ab803 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -735,7 +735,7 @@ public class TopologyBuilderTest {
     public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
         final String sameNameForSourceAndProcessor = "sameName";
         final TopologyBuilder topologyBuilder = new TopologyBuilder()
-            .addGlobalStore(new MockStateStoreSupplier.MockStateStore("anyName", false),
+            .addGlobalStore(new MockStateStoreSupplier("anyName", false, false),
                 sameNameForSourceAndProcessor,
                 null,
                 null,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 65b3e2f..d19c91a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -210,11 +210,11 @@ public class ProcessorTopologyTest {
                 .withStringKeys().withStringValues().inMemory().disableLogging().build();
         final String global = "global";
         final String topic = "topic";
-        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) storeSupplier.get();
         final TopologyBuilder topologyBuilder = this.builder
-                .addGlobalStore(globalStore, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
+                .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
 
         driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
+        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store");
         driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         assertEquals("value1", globalStore.get("key1"));


Mime
View raw message