kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3870: Expose state store names in DSL
Date Mon, 18 Jul 2016 19:12:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7a70c1a10 -> fbc518554


http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 20efd45..b37e5e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -27,14 +27,13 @@ import static org.junit.Assert.assertNotEquals;
 
 public class JoinWindowsTest {
 
-    private static String anyName = "window";
     private static long anySize = 123L;
     private static long anyOtherSize = 456L; // should be larger than anySize
 
     @Test
     public void shouldHaveSaneEqualsAndHashCode() {
-        JoinWindows w1 = JoinWindows.of("w1", anySize);
-        JoinWindows w2 = JoinWindows.of("w2", anySize);
+        JoinWindows w1 = JoinWindows.of(anySize);
+        JoinWindows w2 = JoinWindows.of(anySize);
 
         // Reflexive
         assertEquals(w1, w1);
@@ -45,36 +44,36 @@ public class JoinWindowsTest {
         assertEquals(w2, w1);
         assertEquals(w1.hashCode(), w2.hashCode());
 
-        JoinWindows w3 = JoinWindows.of("w3", w2.after).before(anyOtherSize);
-        JoinWindows w4 = JoinWindows.of("w4", anyOtherSize).after(w2.after);
+        JoinWindows w3 = JoinWindows.of(w2.after).before(anyOtherSize);
+        JoinWindows w4 = JoinWindows.of(anyOtherSize).after(w2.after);
         assertEquals(w3, w4);
         assertEquals(w4, w3);
         assertEquals(w3.hashCode(), w4.hashCode());
 
         // Inequality scenarios
         assertNotEquals("must be false for null", null, w1);
-        assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"), w1);
+        assertNotEquals("must be false for different window types", UnlimitedWindows.of(), w1);
         assertNotEquals("must be false for different types", new Object(), w1);
 
-        JoinWindows differentWindowSize = JoinWindows.of("differentWindowSize", w1.after + 1);
+        JoinWindows differentWindowSize = JoinWindows.of(w1.after + 1);
         assertNotEquals("must be false when window sizes are different", differentWindowSize, w1);
 
-        JoinWindows differentWindowSize2 = JoinWindows.of("differentWindowSize", w1.after).after(w1.after + 1);
+        JoinWindows differentWindowSize2 = JoinWindows.of(w1.after).after(w1.after + 1);
         assertNotEquals("must be false when window sizes are different", differentWindowSize2, w1);
 
-        JoinWindows differentWindowSize3 = JoinWindows.of("differentWindowSize", w1.after).before(w1.before + 1);
+        JoinWindows differentWindowSize3 = JoinWindows.of(w1.after).before(w1.before + 1);
         assertNotEquals("must be false when window sizes are different", differentWindowSize3, w1);
     }
 
     @Test
     public void validWindows() {
-        JoinWindows.of(anyName, anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
+        JoinWindows.of(anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
             .before(anySize)                    // [ -anySize ; anyOtherSize ]
             .before(0)                          // [ 0 ; anyOtherSize ]
             .before(-anySize)                   // [ anySize ; anyOtherSize ]
             .before(-anyOtherSize);             // [ anyOtherSize ; anyOtherSize ]
 
-        JoinWindows.of(anyName, anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
+        JoinWindows.of(anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
             .after(anySize)                     // [ -anyOtherSize ; anySize ]
             .after(0)                           // [ -anyOtherSize ; 0 ]
             .after(-anySize)                    // [ -anyOtherSize ; -anySize ]
@@ -82,28 +81,18 @@ public class JoinWindowsTest {
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void nameMustNotBeEmpty() {
-        JoinWindows.of("", anySize);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void nameMustNotBeNull() {
-        JoinWindows.of(null, anySize);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
     public void timeDifferenceMustNotBeNegative() {
-        JoinWindows.of(anyName, -1);
+        JoinWindows.of(-1);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void afterBelowLower() {
-        JoinWindows.of(anyName, anySize).after(-anySize - 1);
+        JoinWindows.of(anySize).after(-anySize - 1);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void beforeOverUpper() {
-        JoinWindows.of(anyName, anySize).before(-anySize - 1);
+        JoinWindows.of(anySize).before(-anySize - 1);
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 5acd6e2..2bea16b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -29,13 +29,12 @@ import static org.junit.Assert.assertNotEquals;
 
 public class TimeWindowsTest {
 
-    private static String anyName = "window";
     private static long anySize = 123L;
 
     @Test
     public void shouldHaveSaneEqualsAndHashCode() {
-        TimeWindows w1 = TimeWindows.of("w1", anySize);
-        TimeWindows w2 = TimeWindows.of("w2", w1.size);
+        TimeWindows w1 = TimeWindows.of(anySize);
+        TimeWindows w2 = TimeWindows.of(w1.size);
 
         // Reflexive
         assertEquals(w1, w1);
@@ -47,62 +46,53 @@ public class TimeWindowsTest {
         assertEquals(w1.hashCode(), w2.hashCode());
 
         // Transitive
-        TimeWindows w3 = TimeWindows.of("w3", w2.size);
+        TimeWindows w3 = TimeWindows.of(w2.size);
         assertEquals(w2, w3);
         assertEquals(w1, w3);
         assertEquals(w1.hashCode(), w3.hashCode());
 
         // Inequality scenarios
         assertNotEquals("must be false for null", null, w1);
-        assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"), w1);
+        assertNotEquals("must be false for different window types", UnlimitedWindows.of(), w1);
         assertNotEquals("must be false for different types", new Object(), w1);
 
-        TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size + 1);
+        TimeWindows differentWindowSize = TimeWindows.of(w1.size + 1);
         assertNotEquals("must be false when window sizes are different", differentWindowSize, w1);
 
         TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advance - 1);
         assertNotEquals("must be false when advance intervals are different", differentAdvanceInterval, w1);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void nameMustNotBeEmpty() {
-        TimeWindows.of("", anySize);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void nameMustNotBeNull() {
-        TimeWindows.of(null, anySize);
-    }
 
     @Test(expected = IllegalArgumentException.class)
     public void windowSizeMustNotBeNegative() {
-        TimeWindows.of(anyName, -1);
+        TimeWindows.of(-1);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void windowSizeMustNotBeZero() {
-        TimeWindows.of(anyName, 0);
+        TimeWindows.of(0);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void advanceIntervalMustNotBeNegative() {
-        TimeWindows.of(anyName, anySize).advanceBy(-1);
+        TimeWindows.of(anySize).advanceBy(-1);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void advanceIntervalMustNotBeZero() {
-        TimeWindows.of(anyName, anySize).advanceBy(0);
+        TimeWindows.of(anySize).advanceBy(0);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void advanceIntervalMustNotBeLargerThanWindowSize() {
         long size = anySize;
-        TimeWindows.of(anyName, size).advanceBy(size + 1);
+        TimeWindows.of(size).advanceBy(size + 1);
     }
 
     @Test
     public void windowsForHoppingWindows() {
-        TimeWindows windows = TimeWindows.of(anyName, 12L).advanceBy(5L);
+        TimeWindows windows = TimeWindows.of(12L).advanceBy(5L);
         Map<Long, TimeWindow> matched = windows.windowsFor(21L);
         assertEquals(12L / 5L + 1, matched.size());
         assertEquals(new TimeWindow(10L, 22L), matched.get(10L));
@@ -112,7 +102,7 @@ public class TimeWindowsTest {
 
     @Test
     public void windowsForBarelyOverlappingHoppingWindows() {
-        TimeWindows windows = TimeWindows.of(anyName, 6L).advanceBy(5L);
+        TimeWindows windows = TimeWindows.of(6L).advanceBy(5L);
         Map<Long, TimeWindow> matched = windows.windowsFor(7L);
         assertEquals(1, matched.size());
         assertEquals(new TimeWindow(5L, 11L), matched.get(5L));
@@ -120,7 +110,7 @@ public class TimeWindowsTest {
 
     @Test
     public void windowsForTumblingWindows() {
-        TimeWindows windows = TimeWindows.of(anyName, 12L);
+        TimeWindows windows = TimeWindows.of(12L);
         Map<Long, TimeWindow> matched = windows.windowsFor(21L);
         assertEquals(1, matched.size());
         assertEquals(new TimeWindow(12L, 24L), matched.get(12L));

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
index da5f159..c1f4be6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
@@ -29,32 +29,21 @@ import static org.junit.Assert.assertTrue;
 
 public class UnlimitedWindowsTest {
 
-    private static String anyName = "window";
     private static long anyStartTime = 10L;
 
     @Test(expected = IllegalArgumentException.class)
-    public void nameMustNotBeEmpty() {
-        UnlimitedWindows.of("");
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void nameMustNotBeNull() {
-        UnlimitedWindows.of(null);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
     public void startTimeMustNotBeNegative() {
-        UnlimitedWindows.of(anyName).startOn(-1);
+        UnlimitedWindows.of().startOn(-1);
     }
 
     @Test
     public void startTimeCanBeZero() {
-        UnlimitedWindows.of(anyName).startOn(0);
+        UnlimitedWindows.of().startOn(0);
     }
 
     @Test
     public void shouldIncludeRecordsThatHappenedOnWindowStart() {
-        UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime);
+        UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime);
         Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.start);
         assertEquals(1, matchedWindows.size());
         assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime));
@@ -62,7 +51,7 @@ public class UnlimitedWindowsTest {
 
     @Test
     public void shouldIncludeRecordsThatHappenedAfterWindowStart() {
-        UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime);
+        UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime);
         long timestamp = w.start + 1;
         Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp);
         assertEquals(1, matchedWindows.size());
@@ -71,7 +60,7 @@ public class UnlimitedWindowsTest {
 
     @Test
     public void shouldExcludeRecordsThatHappenedBeforeWindowStart() {
-        UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime);
+        UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime);
         long timestamp = w.start - 1;
         Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp);
         assertTrue(matchedWindows.isEmpty());

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 6242702..d5fc41b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -109,14 +109,14 @@ public class KStreamImplTest {
             public Integer apply(Integer value1, Integer value2) {
                 return value1 + value2;
             }
-        }, JoinWindows.of("join-0", anyWindowSize), stringSerde, intSerde, intSerde);
+        }, JoinWindows.of(anyWindowSize), stringSerde, intSerde, intSerde);
 
         KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() {
             @Override
             public Integer apply(Integer value1, Integer value2) {
                 return value1 + value2;
             }
-        }, JoinWindows.of("join-1", anyWindowSize), stringSerde, intSerde, intSerde);
+        }, JoinWindows.of(anyWindowSize), stringSerde, intSerde, intSerde);
 
         stream4.to("topic-5");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index aa7d117..7175f63 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -77,8 +77,7 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
-                intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -176,8 +175,7 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
-                intSerde, stringSerde, stringSerde);
+        joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -277,8 +275,8 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
-                intSerde, stringSerde, stringSerde);
+
+        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 8e05da9..95d9ef6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -78,8 +78,8 @@ public class KStreamKStreamLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
-                intSerde, stringSerde, stringSerde);
+
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -157,8 +157,8 @@ public class KStreamKStreamLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
-                intSerde, stringSerde, stringSerde);
+
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 8bc9a77..f2fad78 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -75,7 +75,7 @@ public class KStreamKTableLeftJoinTest {
 
         processor = new MockProcessorSupplier<>();
         stream = builder.stream(intSerde, stringSerde, topic1);
-        table = builder.table(intSerde, stringSerde, topic2);
+        table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
         stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index db533e4..1a5de5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -75,8 +75,8 @@ public class KStreamWindowAggregateTest {
                                    strSerde)
                     .aggregate(MockInitializer.STRING_INIT,
                                MockAggregator.STRING_ADDER,
-                               TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
-                               strSerde);
+                               TimeWindows.of(10).advanceBy(5),
+                               strSerde, "topic1-Canonized");
 
             MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
             table2.toStream().process(proc2);
@@ -154,8 +154,8 @@ public class KStreamWindowAggregateTest {
                 stream1.groupByKey(strSerde, strSerde)
                     .aggregate(MockInitializer.STRING_INIT,
                                MockAggregator.STRING_ADDER,
-                               TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
-                               strSerde);
+                               TimeWindows.of(10).advanceBy(5),
+                               strSerde, "topic1-Canonized");
 
             MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
             table1.toStream().process(proc1);
@@ -165,8 +165,8 @@ public class KStreamWindowAggregateTest {
                 stream2.groupByKey(strSerde, strSerde)
                     .aggregate(MockInitializer.STRING_INIT,
                                MockAggregator.STRING_ADDER,
-                               TimeWindows.of("topic2-Canonized", 10).advanceBy(5),
-                               strSerde);
+                               TimeWindows.of(10).advanceBy(5),
+                               strSerde, "topic2-Canonized");
 
             MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
             table2.toStream().process(proc2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index e5864ee..26e6a0f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -65,7 +65,7 @@ public class KTableAggregateTest {
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
         KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
                 stringSerde,
                 stringSerde
@@ -105,7 +105,7 @@ public class KTableAggregateTest {
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
         KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
             @Override
                 public KeyValue<String, String> apply(String key, String value) {
@@ -157,7 +157,7 @@ public class KTableAggregateTest {
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
-        builder.table(Serdes.String(), Serdes.String(), input)
+        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
                 .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde)
                 .count("count")
                 .toStream()

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index e328bae..d8dee30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -64,7 +64,7 @@ public class KTableFilterTest {
 
         String topic1 = "topic1";
 
-        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1);
+        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
 
         KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
             @Override
@@ -104,7 +104,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
+                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                 new Predicate<String, Integer>() {
                     @Override
@@ -183,7 +183,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
+                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                 new Predicate<String, Integer>() {
                     @Override
@@ -232,7 +232,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
+                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                 new Predicate<String, Integer>() {
                     @Override
@@ -284,7 +284,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, String, String> table1 =
-            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
         KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
             new Predicate<String, String>() {
                 @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index af131c2..e0cb190 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -36,7 +36,6 @@ import static org.junit.Assert.assertEquals;
 public class KTableForeachTest {
 
     final private String topicName = "topic";
-
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
@@ -78,7 +77,7 @@ public class KTableForeachTest {
 
         // When
         KStreamBuilder builder = new KStreamBuilder();
-        KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName);
+        KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName, "anyStoreName");
         table.foreach(action);
 
         // Then

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index ca3bbe1..6794bb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -72,8 +72,10 @@ public class KTableImplTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
+        String storeName1 = "storeName1";
+        String storeName2 = "storeName2";
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, storeName1);
 
         MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
@@ -98,7 +100,7 @@ public class KTableImplTest {
         MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
         table3.toStream().process(proc3);
 
-        KTable<String, String> table4 = table1.through(stringSerde, stringSerde, topic2);
+        KTable<String, String> table4 = table1.through(stringSerde, stringSerde, topic2, storeName2);
 
         MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
         table4.toStream().process(proc4);
@@ -122,9 +124,11 @@ public class KTableImplTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
+        String storeName1 = "storeName1";
+        String storeName2 = "storeName2";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
@@ -140,7 +144,7 @@ public class KTableImplTest {
                     }
                 });
         KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                table1.through(stringSerde, stringSerde, topic2);
+                table1.through(stringSerde, stringSerde, topic2, storeName2);
 
         KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
         KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
@@ -241,13 +245,15 @@ public class KTableImplTest {
     public void testStateStoreLazyEval() throws IOException {
         String topic1 = "topic1";
         String topic2 = "topic2";
+        String storeName1 = "storeName1";
+        String storeName2 = "storeName2";
 
         final KStreamBuilder builder = new KStreamBuilder();
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
         KTableImpl<String, String, String> table2 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2, storeName2);
 
         KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
@@ -267,7 +273,7 @@ public class KTableImplTest {
         driver = new KStreamTestDriver(builder, stateDir, null, null);
         driver.setTime(0L);
 
-        // no state store should be created
+        // no state stores should be created
         assertEquals(0, driver.allStateStores().size());
     }
 
@@ -275,13 +281,15 @@ public class KTableImplTest {
     public void testStateStore() throws IOException {
         String topic1 = "topic1";
         String topic2 = "topic2";
+        String storeName1 = "storeName1";
+        String storeName2 = "storeName2";
 
         final KStreamBuilder builder = new KStreamBuilder();
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
         KTableImpl<String, String, String> table2 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2, storeName2);
 
         KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
@@ -315,11 +323,12 @@ public class KTableImplTest {
     @Test
     public void testRepartition() throws IOException {
         String topic1 = "topic1";
+        String storeName1 = "storeName1";
 
         final KStreamBuilder builder = new KStreamBuilder();
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
 
         KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1
                 .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 16015fe..3615b46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -46,6 +46,8 @@ public class KTableKTableJoinTest {
 
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
+    final private String storeName1 = "store-name-1";
+    final private String storeName2 = "store-name-2";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
@@ -78,8 +80,8 @@ public class KTableKTableJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        table1 = builder.table(intSerde, stringSerde, topic1);
-        table2 = builder.table(intSerde, stringSerde, topic2);
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
         joined.toStream().process(processor);
 
@@ -170,8 +172,8 @@ public class KTableKTableJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1);
-        table2 = builder.table(intSerde, stringSerde, topic2);
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
@@ -251,8 +253,8 @@ public class KTableKTableJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1);
-        table2 = builder.table(intSerde, stringSerde, topic2);
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 5132ce3..ec07116 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -46,6 +46,8 @@ public class KTableKTableLeftJoinTest {
 
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
+    final private String storeName1 = "store-name-1";
+    final private String storeName2 = "store-name-2";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
@@ -72,8 +74,8 @@ public class KTableKTableLeftJoinTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1);
-        KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2);
+        KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
         MockProcessorSupplier<Integer, String> processor;
         processor = new MockProcessorSupplier<>();
@@ -166,8 +168,8 @@ public class KTableKTableLeftJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1);
-        table2 = builder.table(intSerde, stringSerde, topic2);
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
@@ -247,8 +249,8 @@ public class KTableKTableLeftJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1);
-        table2 = builder.table(intSerde, stringSerde, topic2);
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 3124556..33dfb04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -46,6 +46,8 @@ public class KTableKTableOuterJoinTest {
 
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
+    final private String storeName1 = "store-name-1";
+    final private String storeName2 = "store-name-2";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
@@ -78,8 +80,8 @@ public class KTableKTableOuterJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        table1 = builder.table(intSerde, stringSerde, topic1);
-        table2 = builder.table(intSerde, stringSerde, topic2);
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
         joined.toStream().process(processor);
 
@@ -179,8 +181,8 @@ public class KTableKTableOuterJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1);
-        table2 = builder.table(intSerde, stringSerde, topic2);
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
@@ -268,8 +270,8 @@ public class KTableKTableOuterJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1);
-        table2 = builder.table(intSerde, stringSerde, topic2);
+        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
+        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index cf74017..7666438 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+
 import org.junit.After;
 import org.junit.Test;
 
@@ -38,9 +39,9 @@ public class KTableMapKeysTest {
 
     final private Serde<String> stringSerde = new Serdes.StringSerde();
     final private Serde<Integer>  integerSerde = new Serdes.IntegerSerde();
-
     private KStreamTestDriver driver = null;
 
+
     @After
     public void cleanup() {
         if (driver != null) {
@@ -55,7 +56,7 @@ public class KTableMapKeysTest {
 
         String topic1 = "topic_map_keys";
 
-        KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1);
+        KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1, "anyStoreName");
 
         final Map<Integer, String> keyMap = new HashMap<>();
         keyMap.put(1, "ONE");

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index efb17fc..5739397 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -65,7 +65,7 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
         KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
             @Override
             public Integer apply(String value) {
@@ -76,7 +76,7 @@ public class KTableMapValuesTest {
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
         table2.toStream().process(proc2);
 
-        driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "02");
@@ -92,9 +92,11 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
+        String storeName1 = "storeName1";
+        String storeName2 = "storeName2";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
@@ -110,7 +112,7 @@ public class KTableMapValuesTest {
                     }
                 });
         KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                table1.through(stringSerde, stringSerde, topic2);
+                table1.through(stringSerde, stringSerde, topic2, storeName2);
 
         KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
         KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
@@ -211,7 +213,7 @@ public class KTableMapValuesTest {
         String topic1 = "topic1";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
@@ -256,7 +258,7 @@ public class KTableMapValuesTest {
         String topic1 = "topic1";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 84bfdd6..ad3f02c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -39,6 +39,7 @@ import static org.junit.Assert.assertTrue;
 public class KTableSourceTest {
 
     final private Serde<String> stringSerde = Serdes.String();
+    final private Serde<Integer> intSerde = Serdes.Integer();
 
     private KStreamTestDriver driver = null;
     private File stateDir = null;
@@ -62,9 +63,9 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
 
-        MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
 
         driver = new KStreamTestDriver(builder);
@@ -85,7 +86,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
 
         KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
 
@@ -130,7 +131,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
 
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
 
@@ -165,7 +166,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
 
         table1.enableSendingOldValues();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 1e1e3f4..ba71e05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -123,13 +123,13 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return (value < aggregate) ? value : aggregate;
                     }
                 },
-                UnlimitedWindows.of("uwin-min"),
-                intSerde
+                UnlimitedWindows.of(),
+                intSerde, "uwin-min"
         ).toStream().map(
                 new Unwindow<String, Integer>()
         ).to(stringSerde, intSerde, "min");
 
-        KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min");
+        KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName");
         minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
 
         // max
@@ -145,13 +145,13 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return (value > aggregate) ? value : aggregate;
                     }
                 },
-                UnlimitedWindows.of("uwin-max"),
-                intSerde
+                UnlimitedWindows.of(),
+                intSerde, "uwin-max"
         ).toStream().map(
                 new Unwindow<String, Integer>()
         ).to(stringSerde, intSerde, "max");
 
-        KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max");
+        KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName");
         maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
 
         // sum
@@ -167,23 +167,23 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return (long) value + aggregate;
                     }
                 },
-                UnlimitedWindows.of("win-sum"),
-                longSerde
+                UnlimitedWindows.of(),
+                longSerde, "win-sum"
         ).toStream().map(
                 new Unwindow<String, Long>()
         ).to(stringSerde, longSerde, "sum");
 
 
-        KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum");
+        KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName");
         sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
 
         // cnt
-        groupedData.count(UnlimitedWindows.of("uwin-cnt"))
+        groupedData.count(UnlimitedWindows.of(), "uwin-cnt")
             .toStream().map(
                 new Unwindow<String, Long>()
         ).to(stringSerde, longSerde, "cnt");
 
-        KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt");
+        KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName");
         cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
 
         // dif
@@ -206,7 +206,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, doubleSerde, "avg");
 
         // windowed count
-        groupedData.count(TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE))
+        groupedData.count(TimeWindows.of(WINDOW_SIZE), "tumbling-win-cnt")
             .toStream().map(
                 new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
                     @Override


Mime
View raw message