kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-3870: Expose state store names in DSL
Date Mon, 18 Jul 2016 19:12:59 GMT
KAFKA-3870: Expose state store names in DSL

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Michael G. Noll, Guozhang Wang

Closes #1526 from enothereska/expose-names-dsl


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fbc51855
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fbc51855
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fbc51855

Branch: refs/heads/trunk
Commit: fbc5185543fd4895c7c81ff55b3c8b4c25ac7600
Parents: 7a70c1a
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Mon Jul 18 12:12:51 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jul 18 12:12:51 2016 -0700

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedDemo.java    |  5 +-
 .../examples/pageview/PageViewUntypedDemo.java  |  5 +-
 .../kafka/streams/kstream/JoinWindows.java      | 13 ++--
 .../kafka/streams/kstream/KGroupedStream.java   | 55 ++++++++++----
 .../kafka/streams/kstream/KGroupedTable.java    | 32 ++++++--
 .../apache/kafka/streams/kstream/KStream.java   | 25 ++++++-
 .../kafka/streams/kstream/KStreamBuilder.java   | 17 +++--
 .../apache/kafka/streams/kstream/KTable.java    | 46 +++++++++---
 .../kafka/streams/kstream/TimeWindows.java      | 12 +--
 .../kafka/streams/kstream/UnlimitedWindows.java | 11 ++-
 .../apache/kafka/streams/kstream/Windows.java   | 12 +--
 .../kstream/internals/KGroupedStreamImpl.java   | 52 +++++++------
 .../kstream/internals/KGroupedTableImpl.java    | 28 +++----
 .../streams/kstream/internals/KStreamImpl.java  | 66 +++++++++--------
 .../streams/kstream/internals/KTableImpl.java   | 77 +++++++++++---------
 .../streams/kstream/internals/KTableSource.java | 12 +--
 .../KTableSourceValueGetterSupplier.java        |  8 +-
 .../KStreamAggregationIntegrationTest.java      |  8 +-
 .../KStreamKTableJoinIntegrationTest.java       |  3 +-
 .../integration/KStreamRepartitionJoinTest.java | 14 ++--
 .../kafka/streams/kstream/JoinWindowsTest.java  | 37 ++++------
 .../kafka/streams/kstream/TimeWindowsTest.java  | 36 ++++-----
 .../streams/kstream/UnlimitedWindowsTest.java   | 21 ++----
 .../kstream/internals/KStreamImplTest.java      |  4 +-
 .../internals/KStreamKStreamJoinTest.java       | 10 +--
 .../internals/KStreamKStreamLeftJoinTest.java   |  8 +-
 .../internals/KStreamKTableLeftJoinTest.java    |  2 +-
 .../internals/KStreamWindowAggregateTest.java   | 12 +--
 .../kstream/internals/KTableAggregateTest.java  |  6 +-
 .../kstream/internals/KTableFilterTest.java     | 10 +--
 .../kstream/internals/KTableForeachTest.java    |  3 +-
 .../kstream/internals/KTableImplTest.java       | 29 +++++---
 .../kstream/internals/KTableKTableJoinTest.java | 14 ++--
 .../internals/KTableKTableLeftJoinTest.java     | 14 ++--
 .../internals/KTableKTableOuterJoinTest.java    | 14 ++--
 .../kstream/internals/KTableMapKeysTest.java    |  5 +-
 .../kstream/internals/KTableMapValuesTest.java  | 14 ++--
 .../kstream/internals/KTableSourceTest.java     | 11 +--
 .../streams/smoketest/SmokeTestClient.java      | 24 +++---
 39 files changed, 431 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 19391d8..f88f62f 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -143,7 +143,8 @@ public class PageViewTypedDemo {
 
         KStream<String, PageView> views = builder.stream(Serdes.String(), pageViewSerde, "streams-pageview-input");
 
-        KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde, "streams-userprofile-input");
+        KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde,
+            "streams-userprofile-input", "streams-userprofile-store-name");
 
         KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
                 .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
@@ -168,7 +169,7 @@ public class PageViewTypedDemo {
                     }
                 })
                 .groupByKey(Serdes.String(), pageViewByRegionSerde)
-                .count(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
+                .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index e9aa467..77cf0ca 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -73,7 +73,8 @@ public class PageViewUntypedDemo {
 
         KStream<String, JsonNode> views = builder.stream(Serdes.String(), jsonSerde, "streams-pageview-input");
 
-        KTable<String, JsonNode> users = builder.table(Serdes.String(), jsonSerde, "streams-userprofile-input");
+        KTable<String, JsonNode> users = builder.table(Serdes.String(), jsonSerde,
+            "streams-userprofile-input", "streams-userprofile-store-name");
 
         KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
             @Override
@@ -100,7 +101,7 @@ public class PageViewUntypedDemo {
                     }
                 })
                 .groupByKey(Serdes.String(), jsonSerde)
-                .count(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
+                .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 309a9e6..2552148 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -52,13 +52,12 @@ public class JoinWindows extends Windows<TimeWindow> {
     /** Maximum time difference for tuples that are after the join tuple. */
     public final long after;
 
-    private JoinWindows(String name, long before, long after) {
-        super(name);
+    private JoinWindows(long before, long after) {
+        super();
 
         if (before + after < 0) {
             throw new IllegalArgumentException("Window interval (ie, before+after) must not be negative");
         }
-
         this.after = after;
         this.before = before;
     }
@@ -69,8 +68,8 @@ public class JoinWindows extends Windows<TimeWindow> {
      *
      * @param timeDifference    join window interval
      */
-    public static JoinWindows of(String name, long timeDifference) {
-        return new JoinWindows(name, timeDifference, timeDifference);
+    public static JoinWindows of(long timeDifference) {
+        return new JoinWindows(timeDifference, timeDifference);
     }
 
     /**
@@ -81,7 +80,7 @@ public class JoinWindows extends Windows<TimeWindow> {
      * @param timeDifference    join window interval
      */
     public JoinWindows before(long timeDifference) {
-        return new JoinWindows(this.name, timeDifference, this.after);
+        return new JoinWindows(timeDifference, this.after);
     }
 
     /**
@@ -92,7 +91,7 @@ public class JoinWindows extends Windows<TimeWindow> {
      * @param timeDifference    join window interval
      */
     public JoinWindows after(long timeDifference) {
-        return new JoinWindows(this.name, this.before, timeDifference);
+        return new JoinWindows(this.before, timeDifference);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 25fdb3a..16b55d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -36,55 +36,73 @@ public interface KGroupedStream<K, V> {
 
     /**
      * Combine values of this stream by the grouped key into a new instance of ever-updating
-     * {@link KTable}.
+     * {@link KTable}. The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param reducer           the instance of {@link Reducer}
-     * @param name              the name of the resulted {@link KTable}
+     * @param storeName         the name of the underlying {@link KTable} state store
      *
      * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
      */
     KTable<K, V> reduce(Reducer<V> reducer,
-                        String name);
+                        final String storeName);
 
 
     /**
      * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param reducer           the instance of {@link Reducer}
      * @param windows           the specification of the aggregation {@link Windows}
+     * @param storeName         the name of the state store created from this operation
      * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
      *         where each table contains records with unmodified keys and values
      *         that represent the latest (rolling) aggregate for each key within that window
      */
     <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
-                                                     Windows<W> windows);
+                                                     Windows<W> windows,
+                                                     final String storeName);
 
     /**
      * Aggregate values of this stream by key into a new instance of a {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param initializer   the instance of {@link Initializer}
      * @param aggregator    the instance of {@link Aggregator}
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param <T>           the value type of the resulted {@link KTable}
+     * @param storeName     the name of the state store created from this operation
+     * @param <T>           the value type of the resulting {@link KTable}
      *
      * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
      */
     <T> KTable<K, T> aggregate(Initializer<T> initializer,
                                Aggregator<K, V, T> aggregator,
                                Serde<T> aggValueSerde,
-                               String name);
+                               final String storeName);
 
     /**
      * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param initializer   the instance of {@link Initializer}
      * @param aggregator    the instance of {@link Aggregator}
      * @param windows       the specification of the aggregation {@link Windows}
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param <T>           the value type of the resulted {@link KTable}
-     *
+     * @param <T>           the value type of the resulting {@link KTable}
+     * @param storeName     the name of the state store created from this operation
      * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
      *         where each table contains records with unmodified keys and values with type {@code T}
      *         that represent the latest (rolling) aggregate for each key within that window
@@ -92,28 +110,37 @@ public interface KGroupedStream<K, V> {
     <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
                                                            Aggregator<K, V, T> aggregator,
                                                            Windows<W> windows,
-                                                           Serde<T> aggValueSerde);
+                                                           Serde<T> aggValueSerde,
+                                                           final String storeName);
 
 
     /**
-     * Count number of records of this stream by key into a new instance of a {@link KTable}
+     * Count number of records of this stream by key into a new instance of a {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
-     * @param name  the name of the resulted {@link KTable}
+     * @param storeName  the name of the underlying {@link KTable} state store
      *
      * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
      */
-    KTable<K, Long> count(String name);
+    KTable<K, Long> count(final String storeName);
 
 
     /**
      * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param windows   the specification of the aggregation {@link Windows}
-     *
+     * @param storeName the name of the state store created from this operation
      * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
      *         where each table contains records with unmodified keys and values
      *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
      */
-    <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows);
+    <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows, final String storeName);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 2ebad87..a6b2798 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -35,26 +35,34 @@ public interface KGroupedTable<K, V> {
 
     /**
      * Combine updating values of this stream by the selected key into a new instance of {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param adder         the instance of {@link Reducer} for addition
      * @param subtractor    the instance of {@link Reducer} for subtraction
-     * @param name          the name of the resulted {@link KTable}
+     * @param storeName     the name of the underlying {@link KTable} state store
      * @return a {@link KTable} with the same key and value types as this {@link KGroupedTable},
      *         containing aggregated values for each key
      */
     KTable<K, V> reduce(Reducer<V> adder,
                         Reducer<V> subtractor,
-                        String name);
+                        String storeName);
 
     /**
      * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param initializer   the instance of {@link Initializer}
      * @param adder         the instance of {@link Aggregator} for addition
      * @param substractor   the instance of {@link Aggregator} for subtraction
      * @param aggValueSerde value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param name          the name of the resulted table
+     * @param storeName     the name of the underlying {@link KTable} state store
      * @param <T>           the value type of the aggregated {@link KTable}
      * @return a {@link KTable} with same key and aggregated value type {@code T},
      *         containing aggregated values for each key
@@ -63,16 +71,20 @@ public interface KGroupedTable<K, V> {
                                Aggregator<K, V, T> adder,
                                Aggregator<K, V, T> substractor,
                                Serde<T> aggValueSerde,
-                               String name);
+                               String storeName);
 
     /**
      * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}
      * using default serializers and deserializers.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param initializer   the instance of {@link Initializer}
      * @param adder         the instance of {@link Aggregator} for addition
      * @param substractor   the instance of {@link Aggregator} for subtraction
-     * @param name          the name of the resulted {@link KTable}
+     * @param storeName     the name of the underlying {@link KTable} state store
      * @param <T>           the value type of the aggregated {@link KTable}
      * @return a {@link KTable} with same key and aggregated value type {@code T},
      *         containing aggregated values for each key
@@ -80,15 +92,19 @@ public interface KGroupedTable<K, V> {
     <T> KTable<K, T> aggregate(Initializer<T> initializer,
                                Aggregator<K, V, T> adder,
                                Aggregator<K, V, T> substractor,
-                               String name);
+                               String storeName);
 
     /**
      * Count number of records of this stream by the selected key into a new instance of {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
-     * @param name          the name of the resulted {@link KTable}
+     * @param storeName     the name of the underlying {@link KTable} state store
      * @return a {@link KTable} with same key and {@link Long} value type as this {@link KGroupedTable},
      *         containing the number of values for each key
      */
-    KTable<K, Long> count(String name);
+    KTable<K, Long> count(String storeName);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index f526889..060a1ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -373,6 +373,10 @@ public interface KStream<K, V> {
     /**
      * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join.
      * If a record key is null it will not included in the resulting {@link KStream}
+     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
+     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param otherStream       the instance of {@link KStream} joined with this stream
      * @param joiner            the instance of {@link ValueJoiner}
@@ -400,13 +404,16 @@ public interface KStream<K, V> {
     /**
      * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join
      * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
+     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
+     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
      * @param windows       the specification of the {@link JoinWindows}
      * @param <V1>          the value type of the other stream
      * @param <R>           the value type of the new stream
-     *
      * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
      *         one for each matched record-pair with the same key and within the joining window intervals
      */
@@ -418,6 +425,10 @@ public interface KStream<K, V> {
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join.
      * If a record key is null it will not included in the resulting {@link KStream}
+     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
+     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param otherStream       the instance of {@link KStream} joined with this stream
      * @param joiner            the instance of {@link ValueJoiner}
@@ -445,6 +456,10 @@ public interface KStream<K, V> {
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join
      * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
+     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
+     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -463,6 +478,10 @@ public interface KStream<K, V> {
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join.
      * If a record key is null it will not included in the resulting {@link KStream}
+     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
+     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param otherStream       the instance of {@link KStream} joined with this stream
      * @param joiner            the instance of {@link ValueJoiner}
@@ -490,6 +509,10 @@ public interface KStream<K, V> {
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join
      * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
+     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
+     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 307dcab..2df1bcb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -118,35 +118,40 @@ public class KStreamBuilder extends TopologyBuilder {
      * Create a {@link KTable} instance for the specified topic.
      * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime.
      * The default deserializers specified in the config are used.
+     * The resulting {@link KTable} will be materialized in a local state store with the given store name.
+     * However, no new changelog topic is created in this case since the underlying topic acts as one.
      *
      * @param topic     the topic name; cannot be null
+     * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected
      * @return a {@link KTable} for the specified topics
      */
-    public <K, V> KTable<K, V> table(String topic) {
-        return table(null, null, topic);
+    public <K, V> KTable<K, V> table(String topic, final String storeName) {
+        return table(null, null, topic, storeName);
     }
 
     /**
      * Create a {@link KTable} instance for the specified topic.
      * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime.
+     * The resulting {@link KTable} will be materialized in a local state store with the given store name.
+     * However, no new changelog topic is created in this case since the underlying topic acts as one.
      *
      * @param keySerde   key serde used to send key-value pairs,
      *                   if not specified the default key serde defined in the configuration will be used
      * @param valSerde   value serde used to send key-value pairs,
      *                   if not specified the default value serde defined in the configuration will be used
      * @param topic      the topic name; cannot be null
+     * @param storeName  the state store name used if this KTable is materialized, can be null if materialization not expected
      * @return a {@link KTable} for the specified topics
      */
-    public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+    public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) {
         String source = newName(KStreamImpl.SOURCE_NAME);
         String name = newName(KTableImpl.SOURCE_NAME);
 
         addSource(source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic);
 
-        ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(topic);
+        ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName);
         addProcessor(name, processorSupplier, source);
-
-        return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde);
+        return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde, storeName);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index c16b3d2..29be3e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -168,26 +168,34 @@ public interface KTable<K, V> {
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
      * using default serializers and deserializers and producer's {@link DefaultPartitioner}.
-     * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
+     * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String, String)}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param topic         the topic name
-     *
+     * @param storeName     the state store name used for this KTable
      * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
      */
-    KTable<K, V> through(String topic);
+    KTable<K, V> through(String topic, String storeName);
 
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic using default serializers
      * and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
-     * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
+     * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String, String)}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
      *                     if not specified producer's {@link DefaultPartitioner} will be used
      * @param topic        the topic name
-     *
+     * @param storeName    the state store name used for this KTable
      * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
      */
-    KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
+    KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic, String storeName);
 
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic.
@@ -195,23 +203,31 @@ public interface KTable<K, V> {
      * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
      * &mdash; otherwise producer's {@link DefaultPartitioner} is used.
      * This is equivalent to calling {@link #to(Serde, Serde, String)} and
-     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}.
+     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String, String)}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param keySerde     key serde used to send key-value pairs,
      *                     if not specified the default key serde defined in the configuration will be used
      * @param valSerde     value serde used to send key-value pairs,
      *                     if not specified the default value serde defined in the configuration will be used
      * @param topic        the topic name
-     *
+     * @param storeName    the state store name used for this KTable
      * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
      */
-    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
+    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic, String storeName);
 
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
      * using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and
-     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}.
+     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String, String)}.
+     * The resulting {@link KTable} will be materialized in a local state
+     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
+     * will be automatically created in Kafka for failure recovery, where "applicationID"
+     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
      *
      * @param keySerde     key serde used to send key-value pairs,
      *                     if not specified the default key serde defined in the configuration will be used
@@ -222,10 +238,10 @@ public interface KTable<K, V> {
      *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
      *                     &mdash; otherwise {@link DefaultPartitioner} will be used
      * @param topic        the topic name
-     *
+     * @param storeName    the state store name used for this KTable
      * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
      */
-    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic, String storeName);
 
     /**
      * Materialize this stream to a topic using default serializers specified in the config
@@ -368,4 +384,10 @@ public interface KTable<K, V> {
      * @param action an action to perform on each element
      */
     void foreach(ForeachAction<K, V> action);
+
+    /**
+     * Get the name of the local state store used for materializing this {@link KTable}
+     * @return the underlying state store name, or {@code null} if KTable does not have one
+     */
+    String getStoreName();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 001e92e..1ec4628 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -49,8 +49,9 @@ public class TimeWindows extends Windows<TimeWindow> {
      */
     public final long advance;
 
-    private TimeWindows(String name, long size, long advance) {
-        super(name);
+
+    private TimeWindows(long size, long advance) {
+        super();
         if (size <= 0) {
             throw new IllegalArgumentException("window size must be > 0 (you provided " + size + ")");
         }
@@ -70,14 +71,13 @@ public class TimeWindows extends Windows<TimeWindow> {
      * This provides the semantics of tumbling windows, which are fixed-sized, gap-less,
      * non-overlapping windows. Tumbling windows are a specialization of hopping windows.
      *
-     * @param name The name of the window. Must not be null or empty.
      * @param size The size of the window, with the requirement that size &gt; 0.
      *             The window size's effective time unit is determined by the semantics of the
      *             topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
      * @return a new window definition
      */
-    public static TimeWindows of(String name, long size) {
-        return new TimeWindows(name, size, size);
+    public static TimeWindows of(long size) {
+        return new TimeWindows(size, size);
     }
 
     /**
@@ -94,7 +94,7 @@ public class TimeWindows extends Windows<TimeWindow> {
      * @return a new window definition
      */
     public TimeWindows advanceBy(long interval) {
-        return new TimeWindows(this.name, this.size, interval);
+        return new TimeWindows(this.size, interval);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index f45f8c5..971f3c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -32,9 +32,8 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
     /** The start timestamp of the window. */
     public final long start;
 
-    private UnlimitedWindows(String name, long start) {
-        super(name);
-
+    private UnlimitedWindows(long start) {
+        super();
         if (start < 0) {
             throw new IllegalArgumentException("start must be > 0 (you provided " + start + ")");
         }
@@ -44,8 +43,8 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
     /**
      * Return an unlimited window starting at timestamp zero.
      */
-    public static UnlimitedWindows of(String name) {
-        return new UnlimitedWindows(name, DEFAULT_START_TIMESTAMP);
+    public static UnlimitedWindows of() {
+        return new UnlimitedWindows(DEFAULT_START_TIMESTAMP);
     }
 
     /**
@@ -55,7 +54,7 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
      * @return       a new unlimited window that starts at {@code start}
      */
     public UnlimitedWindows startOn(long start) {
-        return new UnlimitedWindows(this.name, start);
+        return new UnlimitedWindows(start);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 164e584..d0a5861 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -30,25 +30,15 @@ public abstract class Windows<W extends Window> {
 
     private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L;   // one day
 
-    protected String name;
-
     private long maintainDurationMs;
 
     public int segments;
 
-    protected Windows(String name) {
-        if (name == null || name.isEmpty()) {
-            throw new IllegalArgumentException("name must not be null or empty");
-        }
-        this.name = name;
+    protected Windows() {
         this.segments = DEFAULT_NUM_SEGMENTS;
         this.maintainDurationMs = DEFAULT_MAINTAIN_DURATION;
     }
 
-    public String name() {
-        return name;
-    }
-
     /**
      * Set the window maintain duration in milliseconds of system time.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 1830484..51fd116 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -54,22 +54,23 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
 
     @Override
     public KTable<K, V> reduce(final Reducer<V> reducer,
-                               final String name) {
+                               final String storeName) {
         return doAggregate(
-            new KStreamReduce<K, V>(name, reducer),
+            new KStreamReduce<K, V>(storeName, reducer),
             REDUCE_NAME,
-            keyValueStore(valSerde, name));
+            keyValueStore(valSerde, storeName));
     }
 
 
     @SuppressWarnings("unchecked")
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
-                                                            Windows<W> windows) {
+                                                            Windows<W> windows,
+                                                            final String storeName) {
         return (KTable<Windowed<K>, V>) doAggregate(
-            new KStreamWindowReduce<K, V, W>(windows, windows.name(), reducer),
+            new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
             REDUCE_NAME,
-            windowedStore(valSerde, windows)
+            windowedStore(valSerde, windows, storeName)
         );
     }
 
@@ -77,11 +78,11 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<K, V, T> aggregator,
                                       final Serde<T> aggValueSerde,
-                                      final String name) {
+                                      final String storeName) {
         return doAggregate(
-            new KStreamAggregate<>(name, initializer, aggregator),
+            new KStreamAggregate<>(storeName, initializer, aggregator),
             AGGREGATE_NAME,
-            keyValueStore(aggValueSerde, name));
+            keyValueStore(aggValueSerde, storeName));
     }
 
     @SuppressWarnings("unchecked")
@@ -89,16 +90,17 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                                   final Aggregator<K, V, T> aggregator,
                                                                   final Windows<W> windows,
-                                                                  final Serde<T> aggValueSerde) {
+                                                                  final Serde<T> aggValueSerde,
+                                                                  final String storeName) {
         return (KTable<Windowed<K>, T>) doAggregate(
-            new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator),
+            new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
             AGGREGATE_NAME,
-            windowedStore(aggValueSerde, windows)
+            windowedStore(aggValueSerde, windows, storeName)
         );
     }
 
     @Override
-    public KTable<K, Long> count(final String name) {
+    public KTable<K, Long> count(final String storeName) {
         return aggregate(new Initializer<Long>() {
             @Override
             public Long apply() {
@@ -109,11 +111,11 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
             public Long apply(K aggKey, V value, Long aggregate) {
                 return aggregate + 1;
             }
-        }, Serdes.Long(), name);
+        }, Serdes.Long(), storeName);
     }
 
     @Override
-    public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) {
+    public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, final String storeName) {
         return aggregate(new Initializer<Long>() {
             @Override
             public Long apply() {
@@ -124,7 +126,7 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
             public Long apply(K aggKey, V value, Long aggregate) {
                 return aggregate + 1;
             }
-        }, windows, Serdes.Long());
+        }, windows, Serdes.Long(), storeName);
     }
 
     private <T> StateStoreSupplier keyValueStore(final Serde<T> aggValueSerde, final String name) {
@@ -133,16 +135,17 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
 
 
     private <W extends Window, T> StateStoreSupplier windowedStore(final Serde<T> aggValSerde,
-                                                                   final Windows<W> windows) {
-        return storeFactory(aggValSerde, windows.name())
+                                                                   final Windows<W> windows,
+                                                                   final String storeName) {
+        return storeFactory(aggValSerde, storeName)
             .windowed(windows.maintainMs(), windows.segments, false)
             .build();
 
     }
 
     private <T> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<T> aggValueSerde,
-                                                                    final String name) {
-        return Stores.create(name)
+                                                                    final String storeName) {
+        return Stores.create(storeName)
             .withKeys(keySerde)
             .withValues(aggValueSerde)
             .persistent();
@@ -156,7 +159,7 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
 
         final String aggFunctionName = topology.newName(functionName);
 
-        final String sourceName = repartitionIfRequired();
+        final String sourceName = repartitionIfRequired(storeSupplier.name());
 
         topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
         topology.addStateStore(storeSupplier, aggFunctionName);
@@ -165,16 +168,17 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
                                 aggFunctionName,
                                 aggregateSupplier,
                                 sourceName.equals(this.name) ? sourceNodes
-                                                             : Collections.singleton(sourceName));
+                                                             : Collections.singleton(sourceName),
+                                storeSupplier.name());
     }
 
     /**
      * @return the new sourceName if repartitioned. Otherwise the name of this stream
      */
-    private String repartitionIfRequired() {
+    private String repartitionIfRequired(final String storeName) {
         if (!repartitionRequired) {
             return this.name;
         }
-        return KStreamImpl.createReparitionedSource(this, keySerde, valSerde);
+        return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, storeName);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 7118bb9..5039c04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -63,30 +63,30 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                                       Aggregator<K, V, T> adder,
                                       Aggregator<K, V, T> subtractor,
                                       Serde<T> aggValueSerde,
-                                      String name) {
+                                      String storeName) {
 
-        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor);
-        return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, name);
+        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(storeName, initializer, adder, subtractor);
+        return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, storeName);
     }
 
     @Override
     public <T> KTable<K, T> aggregate(Initializer<T> initializer,
                             Aggregator<K, V, T> adder,
                             Aggregator<K, V, T> substractor,
-                            String name) {
+                            String storeName) {
 
-        return aggregate(initializer, adder, substractor, null, name);
+        return aggregate(initializer, adder, substractor, null, storeName);
     }
 
     private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                          Serde<T> aggValueSerde,
                                          String functionName,
-                                         String name) {
+                                         String storeName) {
         String sinkName = topology.newName(KStreamImpl.SINK_NAME);
         String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
         String funcName = topology.newName(functionName);
 
-        String topic = name + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
+        String topic = storeName + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
         Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
         Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
@@ -96,7 +96,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
         ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
 
-        StateStoreSupplier aggregateStore = Stores.create(name)
+        StateStoreSupplier aggregateStore = Stores.create(storeName)
             .withKeys(keySerde)
             .withValues(aggValueSerde)
             .persistent()
@@ -114,19 +114,19 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         topology.addStateStore(aggregateStore, funcName);
 
         // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName));
+        return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeName);
     }
 
     @Override
     public KTable<K, V> reduce(Reducer<V> adder,
                                Reducer<V> subtractor,
-                               String name) {
-        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
-        return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, name);
+                               String storeName) {
+        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(storeName, adder, subtractor);
+        return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, storeName);
     }
 
     @Override
-    public KTable<K, Long> count(String name) {
+    public KTable<K, Long> count(String storeName) {
         return this.aggregate(
                 new Initializer<Long>() {
                     @Override
@@ -145,7 +145,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                         return aggregate - 1L;
                     }
                 },
-                Serdes.Long(), name);
+                Serdes.Long(), storeName);
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b79532e..7ecbf66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -415,12 +415,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             boolean outer) {
 
         return doJoin(other,
-                      joiner,
-                      windows,
-                      keySerde,
-                      thisValueSerde,
-                      otherValueSerde,
-                      new DefaultJoin(outer));
+            joiner,
+            windows,
+            keySerde,
+            thisValueSerde,
+            otherValueSerde,
+            new DefaultJoin(outer));
     }
 
     private <V1, R> KStream<K, R> doJoin(KStream<K, V1> other,
@@ -434,22 +434,22 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         KStreamImpl<K, V1> joinOther = (KStreamImpl) other;
 
         if (joinThis.repartitionRequired) {
-            joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde);
+            joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde, null);
         }
 
         if (joinOther.repartitionRequired) {
-            joinOther = joinOther.repartitionForJoin(keySerde, otherValueSerde);
+            joinOther = joinOther.repartitionForJoin(keySerde, otherValueSerde, null);
         }
 
         joinThis.ensureJoinableWith(joinOther);
 
         return join.join(joinThis,
-                         joinOther,
-                         joiner,
-                         windows,
-                         keySerde,
-                         thisValueSerde,
-                         otherValueSerde);
+            joinOther,
+            joiner,
+            windows,
+            keySerde,
+            thisValueSerde,
+            otherValueSerde);
     }
 
 
@@ -458,25 +458,30 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
      * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
      * @param keySerde      Serdes for serializing the keys
      * @param valSerde      Serdes for serilaizing the values
+     * @param topicNamePrefix  prefix of topic name created for repartitioning, can be null,
+     *                         in which case the prefix will be auto-generated internally.
      * @return a new {@link KStreamImpl}
      */
     private KStreamImpl<K, V> repartitionForJoin(Serde<K> keySerde,
-                                                 Serde<V> valSerde) {
+                                                 Serde<V> valSerde,
+                                                 final String topicNamePrefix) {
 
-        String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde);
+        String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde, topicNamePrefix);
         return new KStreamImpl<>(topology, repartitionedSourceName, Collections
             .singleton(repartitionedSourceName), false);
     }
 
     static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream,
                                                     Serde<K1> keySerde,
-                                                    Serde<V1> valSerde) {
+                                                    Serde<V1> valSerde,
+                                                    final String topicNamePrefix) {
         Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
         Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
         Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
         Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
+        String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name;
 
-        String repartitionTopic = stream.name + REPARTITION_TOPIC_SUFFIX;
+        String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
         String sinkName = stream.topology.newName(SINK_NAME);
         String filterName = stream.topology.newName(FILTER_NAME);
         String sourceName = stream.topology.newName(SOURCE_NAME);
@@ -539,8 +544,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         if (repartitionRequired) {
             KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
-                                                                                valueSerde
-            );
+                                                                                valueSerde, null);
             return thisStreamRepartitioned.doStreamTableLeftJoin(other, joiner);
         } else {
             return doStreamTableLeftJoin(other, joiner);
@@ -598,8 +602,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows windows,
                                                                      final Serde<K> keySerde,
                                                                      final Serde<V> valueSerde,
-                                                                     final String nameSuffix) {
-        return Stores.create(windows.name() + nameSuffix)
+                                                                     final String storeName) {
+        return Stores.create(storeName)
             .withKeys(keySerde)
             .withValues(valueSerde)
             .persistent()
@@ -634,18 +638,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                    Serde<K1> keySerde,
                                                    Serde<V1> lhsValueSerde,
                                                    Serde<V2> otherValueSerde) {
+            String thisWindowStreamName = topology.newName(WINDOWED_NAME);
+            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+            String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
+            String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
+            String joinMergeName = topology.newName(MERGE_NAME);
 
             StateStoreSupplier thisWindow =
-                createWindowedStateStore(windows, keySerde, lhsValueSerde, "-this");
+                createWindowedStateStore(windows, keySerde, lhsValueSerde,  joinThisName + "-store");
 
             StateStoreSupplier otherWindow =
-                createWindowedStateStore(windows, keySerde, otherValueSerde, "-other");
+                createWindowedStateStore(windows, keySerde, otherValueSerde, joinOtherName + "-store");
 
 
             KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(),
                                                                                    windows.before + windows.after + 1,
                                                                                    windows.maintainMs());
-            KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindow .name(),
+            KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(),
                                                                                     windows.before + windows.after + 1,
                                                                                     windows.maintainMs());
 
@@ -662,11 +671,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
             KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
 
-            String thisWindowStreamName = topology.newName(WINDOWED_NAME);
-            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-            String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
-            String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
-            String joinMergeName = topology.newName(MERGE_NAME);
 
             topology.addProcessor(thisWindowStreamName, thisWindowedStream, ((AbstractStream) lhs).name);
             topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
@@ -694,7 +698,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                    Serde<V1> lhsValueSerde,
                                                    Serde<V2> otherValueSerde) {
             StateStoreSupplier otherWindow =
-                createWindowedStateStore(windows, keySerde, otherValueSerde, "-other");
+                createWindowedStateStore(windows, keySerde, otherValueSerde, name + "other");
 
             KStreamJoinWindow<K1, V1>
                 otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 49d2762..17f4716 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -78,14 +78,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private final Serde<K> keySerde;
     private final Serde<V> valSerde;
+    private final String storeName;
 
     private boolean sendOldValues = false;
 
+
     public KTableImpl(KStreamBuilder topology,
                       String name,
                       ProcessorSupplier<?, ?> processorSupplier,
-                      Set<String> sourceNodes) {
-        this(topology, name, processorSupplier, sourceNodes, null, null);
+                      Set<String> sourceNodes,
+                      final String storeName) {
+        this(topology, name, processorSupplier, sourceNodes, null, null, storeName);
     }
 
     public KTableImpl(KStreamBuilder topology,
@@ -93,11 +96,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                       ProcessorSupplier<?, ?> processorSupplier,
                       Set<String> sourceNodes,
                       Serde<K> keySerde,
-                      Serde<V> valSerde) {
+                      Serde<V> valSerde,
+                      final String storeName) {
         super(topology, name, sourceNodes);
         this.processorSupplier = processorSupplier;
         this.keySerde = keySerde;
         this.valSerde = valSerde;
+        this.storeName = storeName;
+    }
+
+    @Override
+    public String getStoreName() {
+        return this.storeName;
     }
 
     @Override
@@ -106,7 +116,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, false);
         topology.addProcessor(name, processorSupplier, this.name);
 
-        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.storeName);
     }
 
     @Override
@@ -116,7 +126,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         topology.addProcessor(name, processorSupplier, this.name);
 
-        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.storeName);
     }
 
     @Override
@@ -126,7 +136,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         topology.addProcessor(name, processorSupplier, this.name);
 
-        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.storeName);
     }
 
     @Override
@@ -184,16 +194,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> through(Serde<K> keySerde,
-                                Serde<V> valSerde,
-                                StreamPartitioner<K, V> partitioner,
-                                String topic) {
-        to(keySerde, valSerde, partitioner, topic);
-
-        return topology.table(keySerde, valSerde, topic);
-    }
-
-    @Override
     public void foreach(final ForeachAction<K, V> action) {
         String name = topology.newName(FOREACH_NAME);
         KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach<>(new ForeachAction<K, Change<V>>() {
@@ -206,18 +206,29 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
-        return through(keySerde, valSerde, null, topic);
+    public KTable<K, V> through(Serde<K> keySerde,
+                                Serde<V> valSerde,
+                                StreamPartitioner<K, V> partitioner,
+                                String topic,
+                                final String storeName) {
+        to(keySerde, valSerde, partitioner, topic);
+
+        return topology.table(keySerde, valSerde, topic, storeName);
+    }
+
+    @Override
+    public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) {
+        return through(keySerde, valSerde, null, topic, storeName);
     }
 
     @Override
-    public KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
-        return through(null, null, partitioner, topic);
+    public KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic, final String storeName) {
+        return through(null, null, partitioner, topic, storeName);
     }
 
     @Override
-    public KTable<K, V> through(String topic) {
-        return through(null, null, null, topic);
+    public KTable<K, V> through(String topic, final String storeName) {
+        return through(null, null, null, topic, storeName);
     }
 
     @Override
@@ -271,15 +282,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         KTableKTableJoin<K, R, V, V1> joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
         KTableKTableJoin<K, R, V1, V> joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
         KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes),
-                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
+                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
         );
 
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
 
-        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
+        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
     }
 
     @SuppressWarnings("unchecked")
@@ -294,15 +305,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         KTableKTableOuterJoin<K, R, V, V1> joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
         KTableKTableOuterJoin<K, R, V1, V> joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
         KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes),
-                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
+                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
         );
 
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
 
-        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
+        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
     }
 
     @SuppressWarnings("unchecked")
@@ -317,15 +328,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
         KTableKTableRightJoin<K, R, V1, V> joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
         KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes),
-                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
+                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
         );
 
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
 
-        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
+        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
     }
 
     @Override
@@ -354,7 +365,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         if (processorSupplier instanceof KTableSource) {
             KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
             materialize(source);
-            return new KTableSourceValueGetterSupplier<>(source.topic);
+            return new KTableSourceValueGetterSupplier<>(source.storeName);
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
             return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
         } else {
@@ -386,7 +397,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         synchronized (source) {
             if (!source.isMaterialized()) {
                 StateStoreSupplier storeSupplier =
-                        new KTableStoreSupplier<>(source.topic, keySerde, valSerde, null);
+                        new KTableStoreSupplier<>(source.storeName, keySerde, valSerde, null);
                 // mark this state as non internal hence it is read directly from a user topic
                 topology.addStateStore(storeSupplier, false, name);
                 source.materialize();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 5aafc02..05befed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -26,13 +26,13 @@ import org.apache.kafka.streams.state.KeyValueStore;
 
 public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
 
-    public final String topic;
+    public final String storeName;
 
     private boolean materialized = false;
     private boolean sendOldValues = false;
 
-    public KTableSource(String topic) {
-        this.topic = topic;
+    public KTableSource(String storeName) {
+        this.storeName = storeName;
     }
 
     @Override
@@ -57,7 +57,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
         public void process(K key, V value) {
             // the keys should never be null
             if (key == null)
-                throw new StreamsException("Record key for the source KTable from topic " + topic + " should not be null.");
+                throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null.");
 
             context().forward(key, new Change<>(value, null));
         }
@@ -71,14 +71,14 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
-            store = (KeyValueStore<K, V>) context.getStateStore(topic);
+            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
         @Override
         public void process(K key, V value) {
             // the keys should never be null
             if (key == null)
-                throw new StreamsException("Record key for the source KTable from topic " + topic + " should not be null.");
+                throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null.");
 
             V oldValue = sendOldValues ? store.get(key) : null;
             store.put(key, value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index dab92d5..fe41fa0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -22,10 +22,10 @@ import org.apache.kafka.streams.state.KeyValueStore;
 
 public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
 
-    private final String topic;
+    private final String storeName;
 
-    public KTableSourceValueGetterSupplier(String topic) {
-        this.topic = topic;
+    public KTableSourceValueGetterSupplier(String storeName) {
+        this.storeName = storeName;
     }
 
     public KTableValueGetter<K, V> get() {
@@ -38,7 +38,7 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
 
         @SuppressWarnings("unchecked")
         public void init(ProcessorContext context) {
-            store = (KeyValueStore<K, V>) context.getStateStore(topic);
+            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
         public V get(K key) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index b91a907..71aebad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -174,7 +174,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondBatchTimestamp);
 
         groupedStream
-            .reduce(reducer, TimeWindows.of("reduce-time-windows", 500L))
+            .reduce(reducer, TimeWindows.of(500L), "reduce-time-windows")
             .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
                 @Override
                 public String apply(Windowed<String> windowedKey, String value) {
@@ -276,8 +276,8 @@ public class KStreamAggregationIntegrationTest {
         groupedStream.aggregate(
             initializer,
             aggregator,
-            TimeWindows.of("aggregate-by-key-windowed", 500L),
-            Serdes.Integer())
+            TimeWindows.of(500L),
+            Serdes.Integer(), "aggregate-by-key-windowed")
             .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
                 @Override
                 public String apply(Windowed<String> windowedKey, Integer value) {
@@ -371,7 +371,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(timestamp);
 
         stream.groupByKey(Serdes.Integer(), Serdes.String())
-            .count(TimeWindows.of("count-windows", 500L))
+            .count(TimeWindows.of(500L), "count-windows")
             .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
                 @Override
                 public String apply(final Windowed<Integer> windowedKey, final Long value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index b7d4fc3..e290eb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -56,6 +56,7 @@ public class KStreamKTableJoinIntegrationTest {
     public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
     private static final String USER_CLICKS_TOPIC = "user-clicks";
     private static final String USER_REGIONS_TOPIC = "user-regions";
+    private static final String USER_REGIONS_STORE_NAME = "user-regions-store-name";
     private static final String OUTPUT_TOPIC = "output-topic";
 
     @BeforeClass
@@ -169,7 +170,7 @@ public class KStreamKTableJoinIntegrationTest {
         // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe"
         // (which overrides her previous region value of "asia").
         KTable<String, String> userRegionsTable =
-            builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC);
+            builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC, USER_REGIONS_STORE_NAME);
 
         // Compute the number of clicks per region, e.g. "europe" -> 13L.
         //

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 434216e..ba6956d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -203,7 +203,7 @@ public class KStreamRepartitionJoinTest {
         streamTwo
             .join(streamOne.map(keyMapper),
                   joiner,
-                  getJoinWindow(output),
+                  getJoinWindow(),
                   Serdes.Integer(),
                   Serdes.String(),
                   Serdes.Integer())
@@ -221,7 +221,7 @@ public class KStreamRepartitionJoinTest {
         String outputTopic = "left-join";
         map1.leftJoin(map2,
                       valueJoiner,
-                      getJoinWindow("the-left-join"),
+                      getJoinWindow(),
                       Serdes.Integer(),
                       Serdes.Integer(),
                       Serdes.String())
@@ -241,7 +241,7 @@ public class KStreamRepartitionJoinTest {
 
         final KStream<Integer, String> join = map1.join(map2,
                                                         valueJoiner,
-                                                        getJoinWindow("join-one"),
+                                                        getJoinWindow(),
                                                         Serdes.Integer(),
                                                         Serdes.Integer(),
                                                         Serdes.String());
@@ -256,7 +256,7 @@ public class KStreamRepartitionJoinTest {
         join.map(kvMapper)
             .join(streamFour.map(kvMapper),
                   joiner,
-                  getJoinWindow("the-other-join"),
+                  getJoinWindow(),
                   Serdes.Integer(),
                   Serdes.String(),
                   Serdes.String())
@@ -267,8 +267,8 @@ public class KStreamRepartitionJoinTest {
                             topic);
     }
 
-    private JoinWindows getJoinWindow(String name) {
-        return (JoinWindows) JoinWindows.of(name, WINDOW_SIZE).until(3 * WINDOW_SIZE);
+    private JoinWindows getJoinWindow() {
+        return (JoinWindows) JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE);
     }
 
 
@@ -395,7 +395,7 @@ public class KStreamRepartitionJoinTest {
         CLUSTER.createTopic(outputTopic);
         lhs.join(rhs,
                  valueJoiner,
-                 getJoinWindow(joinName),
+                 getJoinWindow(),
                  Serdes.Integer(),
                  Serdes.Integer(),
                  Serdes.String())


Mime
View raw message