kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-3914: Global discovery of state stores
Date Wed, 10 Aug 2016 21:25:33 GMT
KAFKA-3914: Global discovery of state stores

guozhangwang enothereska mjsax miguno  please take a look. A few things that need to be clarified

1. I've added StreamsConfig.USER_ENDPOINT_CONFIG, but should we have separate configs for host and port or is this one config ok?
2. `HostState` in the KIP has a byte[] field - not sure why and what it would be populated with
3. I've changed the API to return `List<KafkaStreamsInstance>` as opposed to `Map<HostInfo, Set<TaskMetadata>>` as i find this far more intuitive to work with.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Michael G. Noll, Eno Thereska, Guozhang Wang

Closes #1576 from dguy/kafka-3914v2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68b5d014
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68b5d014
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68b5d014

Branch: refs/heads/trunk
Commit: 68b5d014fa3bf2e18da253ded2bbcd4f5d4a9d8d
Parents: caa9bd0
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Aug 10 14:25:23 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Aug 10 14:25:23 2016 -0700

----------------------------------------------------------------------
 .../producer/internals/DefaultPartitioner.java  |  22 +-
 .../org/apache/kafka/common/utils/Utils.java    |  16 ++
 .../org/apache/kafka/streams/KafkaStreams.java  |  94 ++++++-
 .../org/apache/kafka/streams/StreamsConfig.java |  10 +
 .../internals/WindowedStreamPartitioner.java    |   5 +-
 .../streams/processor/TopologyBuilder.java      |  55 +++-
 .../internals/DefaultStreamPartitioner.java     |  43 +++
 .../internals/StreamPartitionAssignor.java      | 103 +++++++-
 .../processor/internals/StreamThread.java       |  10 +-
 .../internals/StreamsMetadataState.java         | 237 +++++++++++++++++
 .../internals/assignment/AssignmentInfo.java    | 155 ++++++-----
 .../internals/assignment/SubscriptionInfo.java  |  91 ++++---
 .../apache/kafka/streams/state/HostInfo.java    |  81 ++++++
 .../kafka/streams/state/StreamsMetadata.java    |  93 +++++++
 .../apache/kafka/streams/KafkaStreamsTest.java  |  43 ++-
 .../integration/RegexSourceIntegrationTest.java |   3 +-
 .../streams/kstream/KStreamBuilderTest.java     |  51 ++++
 .../streams/processor/TopologyBuilderTest.java  |  35 +++
 .../internals/StreamPartitionAssignorTest.java  | 185 +++++++++++--
 .../processor/internals/StreamThreadTest.java   |   8 +-
 .../internals/StreamsMetadataStateTest.java     | 262 +++++++++++++++++++
 .../assignment/AssginmentInfoTest.java          |  50 ----
 .../assignment/AssignmentInfoTest.java          | 107 ++++++++
 .../assignment/SubscriptionInfoTest.java        |  57 +++-
 .../StreamThreadStateStoreProviderTest.java     |   5 +-
 25 files changed, 1599 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index f81c496..241e809 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -37,22 +37,6 @@ public class DefaultPartitioner implements Partitioner {
 
     private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
 
-    /**
-     * A cheap way to deterministically convert a number to a positive value. When the input is
-     * positive, the original value is returned. When the input number is negative, the returned
-     * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
-     * value.
-     *
-     * Note: changing this method in the future will possibly cause partition selection not to be
-     * compatible with the existing messages already placed on a partition.
-     *
-     * @param number a given number
-     * @return a positive number.
-     */
-    private static int toPositive(int number) {
-        return number & 0x7fffffff;
-    }
-
     public void configure(Map<String, ?> configs) {}
 
     /**
@@ -72,15 +56,15 @@ public class DefaultPartitioner implements Partitioner {
             int nextValue = counter.getAndIncrement();
             List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
             if (availablePartitions.size() > 0) {
-                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
+                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                 return availablePartitions.get(part).partition();
             } else {
                 // no partitions are available, give a non-available partition
-                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
+                return Utils.toPositive(nextValue) % numPartitions;
             }
         } else {
             // hash the keyBytes to choose a partition
-            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e740618..4629baf 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -699,4 +699,20 @@ public class Utils {
             throw exception;
     }
 
+    /**
+     * A cheap way to deterministically convert a number to a positive value. When the input is
+     * positive, the original value is returned. When the input number is negative, the returned
+     * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
+     * value.
+     *
+     * Note: changing this method in the future will possibly cause partition selection not to be
+     * compatible with the existing messages already placed on a partition since it is used
+     * in producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}
+     *
+     * @param number a given number
+     * @return a positive number.
+     */
+    public static int toPositive(int number) {
+        return number & 0x7fffffff;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b9553c9..4fabdf7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -22,28 +22,34 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.internals.StateStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
  * sends output to zero or more output topics.
@@ -104,6 +110,7 @@ public class KafkaStreams {
     // of the co-location of stream thread's consumers. It is for internal
     // usage only and should not be exposed to users at all.
     private final UUID processId;
+    private StreamsMetadataState streamsMetadataState;
 
     private final StreamsConfig config;
 
@@ -164,11 +171,19 @@ public class KafkaStreams {
 
         this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
+        streamsMetadataState = new StreamsMetadataState(builder);
         for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, this.processId, this.metrics, time);
+            this.threads[i] = new StreamThread(builder,
+                                               config,
+                                               clientSupplier,
+                                               applicationId,
+                                               clientId,
+                                               processId,
+                                               metrics,
+                                               time,
+                                               streamsMetadataState);
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
-
         this.queryableStoreProvider = new QueryableStoreProvider(storeProviders);
     }
 
@@ -274,6 +289,77 @@ public class KafkaStreams {
             thread.setUncaughtExceptionHandler(eh);
     }
 
+
+    /**
+     * Find all of the instances of {@link StreamsMetadata} in the {@link KafkaStreams} application that this instance belongs to
+     *
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     * @return collection containing all instances of {@link StreamsMetadata} in the {@link KafkaStreams} application that this instance belongs to
+     */
+    public Collection<StreamsMetadata> allMetadata() {
+        validateIsRunning();
+        return streamsMetadataState.getAllMetadata();
+    }
+
+
+    /**
+     * Find instances of {@link StreamsMetadata} that contains the given storeName
+     *
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     * @param storeName the storeName to find metadata for
+     * @return  A collection containing instances of {@link StreamsMetadata} that have the provided storeName
+     */
+    public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
+        validateIsRunning();
+        return streamsMetadataState.getAllMetadataForStore(storeName);
+    }
+
+    /**
+     * Find the {@link StreamsMetadata} instance that contains the given storeName
+     * and the corresponding hosted store instance contains the given key. This will use
+     * the {@link org.apache.kafka.streams.processor.internals.DefaultStreamPartitioner} to
+     * locate the partition. If a custom partitioner has been used please use
+     * {@link KafkaStreams#metadataForKey(String, Object, StreamPartitioner)}
+     *
+     * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
+     * this method provides a way of finding which host it would exist on.
+     *
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     * @param storeName         Name of the store
+     * @param key               Key to use to for partition
+     * @param keySerializer     Serializer for the key
+     * @param <K>               key type
+     * @return  The {@link StreamsMetadata} for the storeName and key
+     */
+    public <K> StreamsMetadata metadataForKey(final String storeName,
+                                              final K key,
+                                              final Serializer<K> keySerializer) {
+        validateIsRunning();
+        return streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer);
+    }
+
+    /**
+     * Find the {@link StreamsMetadata} instance that contains the given storeName
+     * and the corresponding hosted store instance contains the given key
+     *
+     * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
+     * this method provides a way of finding which host it would exist on.
+     *
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     * @param storeName         Name of the store
+     * @param key               Key to use to for partition
+     * @param partitioner       Partitioner for the store
+     * @param <K>               key type
+     * @return  The {@link StreamsMetadata} for the storeName and key
+     */
+    public <K> StreamsMetadata metadataForKey(final String storeName,
+                                              final K key,
+                                              final StreamPartitioner<K, ?> partitioner) {
+        validateIsRunning();
+        return streamsMetadataState.getMetadataWithKey(storeName, key, partitioner);
+    }
+
+
     /**
      * Get a facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances
      * with the provided storeName and accepted by {@link QueryableStoreType#accepts(StateStore)}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b624e0e..41498cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -109,6 +109,10 @@ public class StreamsConfig extends AbstractConfig {
     public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
     public static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface.";
 
+    /**<code>user.endpoint</code> */
+    public static final String APPLICATION_SERVER_CONFIG = "application.server";
+    public static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application";
+
     /** <code>metrics.sample.window.ms</code> */
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
 
@@ -225,6 +229,11 @@ public class StreamsConfig extends AbstractConfig {
                                         atLeast(1),
                                         Importance.LOW,
                                         CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                                .define(APPLICATION_SERVER_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.LOW,
+                                        APPLICATION_SERVER_DOC)
                                 .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
                                         Type.CLASS,
                                         null,
@@ -319,6 +328,7 @@ public class StreamsConfig extends AbstractConfig {
         if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals(""))
             props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG));
 
+        props.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 1e30864..ba9873b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
+import static org.apache.kafka.common.utils.Utils.toPositive;
+
 public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
 
     private final WindowedSerializer<K> serializer;
@@ -45,8 +47,5 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
         return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
     }
 
-    private static int toPositive(int number) {
-        return number & 0x7fffffff;
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 8e3dc7a..7b79236 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -68,6 +68,7 @@ public class TopologyBuilder {
     private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
     private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
     private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+    private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();
     private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>();
     private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
     private String applicationId;
@@ -616,12 +617,40 @@ public class TopologyBuilder {
 
         NodeFactory nodeFactory = nodeFactories.get(processorName);
         if (nodeFactory instanceof ProcessorNodeFactory) {
-            ((ProcessorNodeFactory) nodeFactory).addStateStore(stateStoreName);
+            ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
+            processorNodeFactory.addStateStore(stateStoreName);
+            connectStateStoreNameToSourceTopics(stateStoreName, processorNodeFactory);
         } else {
             throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
         }
     }
 
+
+    private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
+        final Set<String> sourceTopics = new HashSet<>();
+        for (String parent : parents) {
+            NodeFactory nodeFactory = nodeFactories.get(parent);
+            if (nodeFactory instanceof SourceNodeFactory) {
+                sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) nodeFactory).getTopics()));
+            } else if (nodeFactory instanceof ProcessorNodeFactory) {
+                sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents));
+            }
+        }
+        return sourceTopics;
+    }
+
+    private void connectStateStoreNameToSourceTopics(final String stateStoreName,
+                                                     final ProcessorNodeFactory processorNodeFactory) {
+
+        final Set<String> sourceTopics = findSourceTopicsForProcessorParents(processorNodeFactory.parents);
+        if (sourceTopics.isEmpty()) {
+            throw new TopologyBuilderException("can't find source topic for state store " +
+                    stateStoreName);
+        }
+        stateStoreNameToSourceTopics.put(stateStoreName,
+                Collections.unmodifiableSet(sourceTopics));
+    }
+
     /**
      * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.
@@ -875,20 +904,25 @@ public class TopologyBuilder {
      * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
      */
     public synchronized Set<String> sourceTopics() {
+        Set<String> topics = maybeDecorateInternalSourceTopics(sourceTopicNames);
+        return Collections.unmodifiableSet(topics);
+    }
+
+    private Set<String> maybeDecorateInternalSourceTopics(final Set<String> sourceTopicNames) {
         Set<String> topics = new HashSet<>();
         for (String topic : sourceTopicNames) {
             if (internalTopicNames.contains(topic)) {
                 if (applicationId == null) {
                     throw new TopologyBuilderException("there are internal topics and "
                                                        + "applicationId is null. Call "
-                                                       + "setApplicationId before sourceTopics");
+                                                       + "setApplicationId first");
                 }
                 topics.add(applicationId + "-" + topic);
             } else {
                 topics.add(topic);
             }
         }
-        return Collections.unmodifiableSet(topics);
+        return topics;
     }
 
     public synchronized Pattern sourceTopicPattern() {
@@ -917,7 +951,8 @@ public class TopologyBuilder {
 
     /**
      * Set the applicationId. This is required before calling
-     * {@link #sourceTopics}, {@link #topicGroups} and {@link #copartitionSources}
+     * {@link #sourceTopics}, {@link #topicGroups}, {@link #copartitionSources}, and
+     * {@link #stateStoreNameToSourceTopics}
      * @param applicationId   the streams applicationId. Should be the same as set by
      * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
      */
@@ -925,4 +960,16 @@ public class TopologyBuilder {
         Objects.requireNonNull(applicationId, "applicationId can't be null");
         this.applicationId = applicationId;
     }
+
+    /**
+     * @return a mapping from state store name to a Set of source Topics.
+     */
+    public Map<String, Set<String>> stateStoreNameToSourceTopics() {
+        final Map<String, Set<String>> results = new HashMap<>();
+        for (Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) {
+            results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue()));
+
+        }
+        return results;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
new file mode 100644
index 0000000..006f010
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
+
+    private final Serializer<K> keySerializer;
+    private final Cluster cluster;
+    private final String topic;
+    private final DefaultPartitioner defaultPartitioner;
+
+    public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster, final String topic) {
+        this.keySerializer = keySerializer;
+        this.cluster = cluster;
+        this.topic = topic;
+        this.defaultPartitioner = new DefaultPartitioner();
+    }
+
+    @Override
+    public Integer partition(final K key, final V value, final int numPartitions) {
+        final byte[] keyBytes = keySerializer.serialize(topic, key);
+        return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 4b52511..fd70a01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,7 +24,9 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.TaskId;
@@ -33,7 +35,7 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +55,10 @@ import java.util.UUID;
 public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
 
     private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
+    private String userEndPointConfig;
+    private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+    private Cluster metadataWithInternalTopics;
+
 
     private static class AssignedPartition implements Comparable<AssignedPartition> {
         public final TaskId taskId;
@@ -119,6 +125,25 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         streamThread = (StreamThread) o;
         streamThread.partitionAssignor(this);
 
+        String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
+        if (userEndPoint != null && !userEndPoint.isEmpty()) {
+            final String[] hostPort = userEndPoint.split(":");
+            if (hostPort.length != 2) {
+                throw new ConfigException(String.format("Config %s isn't in the correct format. Expected a host:port pair" +
+                                                       " but received %s",
+                                                        StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+            } else {
+                try {
+                    Integer.valueOf(hostPort[1]);
+                    this.userEndPointConfig = userEndPoint;
+                } catch (NumberFormatException nfe) {
+                    throw new ConfigException(String.format("Invalid port %s supplied in %s for config %s",
+                                                           hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
+                }
+            }
+
+        }
+
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
             internalTopicManager = new InternalTopicManager(
                     (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
@@ -143,7 +168,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Set<TaskId> prevTasks = streamThread.prevTasks();
         Set<TaskId> standbyTasks = streamThread.cachedTasks();
         standbyTasks.removeAll(prevTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
+        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
 
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
@@ -228,6 +253,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Map<UUID, Set<String>> consumersByClient = new HashMap<>();
         Map<UUID, ClientState<TaskId>> states = new HashMap<>();
         SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+        Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
         // decode subscription info
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             String consumerId = entry.getKey();
@@ -239,7 +265,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
 
             SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
-
+            if (info.userEndPoint != null) {
+                final String[] hostPort = info.userEndPoint.split(":");
+                consumerEndPointMap.put(info.processId, new HostInfo(hostPort[0], Integer.valueOf(hostPort[1])));
+            }
             Set<String> consumers = consumersByClient.get(info.processId);
             if (consumers == null) {
                 consumers = new HashSet<>();
@@ -327,7 +356,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, false);
         internalSourceTopicToTaskIds.clear();
 
-        Cluster metadataWithInternalTopics = metadata;
+        metadataWithInternalTopics = metadata;
         if (internalTopicManager != null)
             metadataWithInternalTopics = metadata.withPartitions(internalPartitionInfos);
 
@@ -361,8 +390,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         // assign tasks to clients
         states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
-        Map<String, Assignment> assignment = new HashMap<>();
 
+        final List<AssignmentSupplier> assignmentSuppliers = new ArrayList<>();
+
+        final Map<HostInfo, Set<TopicPartition>> endPointMap = new HashMap<>();
         for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
             UUID processId = entry.getKey();
             Set<String> consumers = entry.getValue();
@@ -408,14 +439,24 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 for (AssignedPartition partition : assignedPartitions) {
                     active.add(partition.taskId);
                     activePartitions.add(partition.partition);
+                    HostInfo hostInfo = consumerEndPointMap.get(processId);
+                    if (hostInfo != null) {
+                        if (!endPointMap.containsKey(hostInfo)) {
+                            endPointMap.put(hostInfo, new HashSet<TopicPartition>());
+                        }
+                        final Set<TopicPartition> topicPartitions = endPointMap.get(hostInfo);
+                        topicPartitions.add(partition.partition);
+                    }
                 }
 
-                AssignmentInfo data = new AssignmentInfo(active, standby);
-                assignment.put(consumer, new Assignment(activePartitions, data.encode()));
-                i++;
 
-                active.clear();
-                standby.clear();
+                assignmentSuppliers.add(new AssignmentSupplier(consumer,
+                                                               active,
+                                                               standby,
+                                                               endPointMap,
+                                                               activePartitions));
+
+                i++;
             }
         }
 
@@ -424,9 +465,39 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // change log topics should be compacted
         prepareTopic(stateChangelogTopicToTaskIds, true /* compactTopic */, true);
 
+        Map<String, Assignment> assignment = new HashMap<>();
+        for (AssignmentSupplier assignmentSupplier : assignmentSuppliers) {
+            assignment.put(assignmentSupplier.consumer, assignmentSupplier.get());
+        }
         return assignment;
     }
 
+    class AssignmentSupplier {
+        private final String consumer;
+        private final List<TaskId> active;
+        private final Map<TaskId, Set<TopicPartition>> standby;
+        private final Map<HostInfo, Set<TopicPartition>> endPointMap;
+        private final List<TopicPartition> activePartitions;
+
+        AssignmentSupplier(final String consumer,
+                           final List<TaskId> active,
+                           final Map<TaskId, Set<TopicPartition>> standby,
+                           final Map<HostInfo, Set<TopicPartition>> endPointMap,
+                           final List<TopicPartition> activePartitions) {
+            this.consumer = consumer;
+            this.active = active;
+            this.standby = standby;
+            this.endPointMap = endPointMap;
+            this.activePartitions = activePartitions;
+        }
+
+        Assignment get() {
+            return new Assignment(activePartitions, new AssignmentInfo(active,
+                                                                       standby,
+                                                                       endPointMap).encode());
+        }
+    }
+
     /**
      * @throws TaskAssignmentException if there is no task id for one of the partitions specified
      */
@@ -460,6 +531,18 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
         this.partitionToTaskIds = partitionToTaskIds;
+        this.partitionsByHostState = info.partitionsByHostState;
+    }
+
+    public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
+        if (partitionsByHostState == null) {
+            return Collections.emptyMap();
+        }
+        return Collections.unmodifiableMap(partitionsByHostState);
+    }
+
+    public Cluster clusterMetadata() {
+        return metadataWithInternalTopics;
     }
 
     private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Set<String>> internalTopicGroups, Cluster metadata) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c84cae0..f416443 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +70,7 @@ public class StreamThread extends Thread {
     private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
 
     public final PartitionGrouper partitionGrouper;
+    private final StreamsMetadataState streamsMetadataState;
     public final String applicationId;
     public final String clientId;
     public final UUID processId;
@@ -110,6 +112,7 @@ public class StreamThread extends Thread {
                 addStreamTasks(assignment);
                 addStandbyTasks();
                 lastClean = time.milliseconds(); // start the cleaning cycle
+                streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
             } catch (Throwable t) {
                 rebalanceException = t;
                 throw t;
@@ -127,6 +130,7 @@ public class StreamThread extends Thread {
             } finally {
                 // TODO: right now upon partition revocation, we always remove all the tasks;
                 // this behavior can be optimized to only remove affected tasks in the future
+                streamsMetadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), partitionAssignor.clusterMetadata());
                 removeStreamTasks();
                 removeStandbyTasks();
             }
@@ -140,7 +144,8 @@ public class StreamThread extends Thread {
                         String clientId,
                         UUID processId,
                         Metrics metrics,
-                        Time time) {
+                        Time time,
+                        StreamsMetadataState streamsMetadataState) {
         super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
 
         this.applicationId = applicationId;
@@ -151,6 +156,7 @@ public class StreamThread extends Thread {
         this.clientId = clientId;
         this.processId = processId;
         this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
+        this.streamsMetadataState = streamsMetadataState;
 
         // set the producer and consumer clients
         String threadName = getName();
@@ -500,6 +506,8 @@ public class StreamThread extends Thread {
         return tasks;
     }
 
+
+
     protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
new file mode 100644
index 0000000..eeb3bc9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.StreamsMetadata;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+
+/**
+ * Provides access to the {@link StreamsMetadata} in a KafkaStreams application. This can be used
+ * to discover the locations of {@link org.apache.kafka.streams.processor.StateStore}s
+ * in a KafkaStreams application
+ */
+public class StreamsMetadataState {
+    private final TopologyBuilder builder;
+    private final List<StreamsMetadata> allMetadata = new ArrayList<>();
+    private Cluster clusterMetadata;
+
+    public StreamsMetadataState(final TopologyBuilder builder) {
+        this.builder = builder;
+    }
+
+    /**
+     * Find all of the {@link StreamsMetadata}s in a
+     * {@link KafkaStreams application}
+     *
+     * @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application
+     */
+    public synchronized Collection<StreamsMetadata> getAllMetadata() {
+        return allMetadata;
+    }
+    
+    /**
+     * Find all of the {@link StreamsMetadata}s for a given storeName
+     *
+     * @param storeName the storeName to find metadata for
+     * @return A collection of {@link StreamsMetadata} that have the provided storeName
+     */
+    public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName) {
+        Objects.requireNonNull(storeName, "storeName cannot be null");
+
+        final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName);
+        if (sourceTopics == null) {
+            return Collections.emptyList();
+        }
+
+        final ArrayList<StreamsMetadata> results = new ArrayList<>();
+        for (StreamsMetadata metadata : allMetadata) {
+            if (metadata.stateStoreNames().contains(storeName)) {
+                results.add(metadata);
+            }
+        }
+        return results;
+    }
+
+    /**
+     * Find the {@link StreamsMetadata}s for a given storeName and key. This method will use the
+     * {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used
+     * please use {@link StreamsMetadataState#getMetadataWithKey(String, Object, StreamPartitioner)}
+     *
+     * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
+     * this method provides a way of finding which {@link StreamsMetadata} it would exist on.
+     *
+     * @param storeName     Name of the store
+     * @param key           Key to use
+     * @param keySerializer Serializer for the key
+     * @param <K>           key type
+     * @return The {@link StreamsMetadata} for the storeName and key
+     */
+    public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName,
+                                                               final K key,
+                                                               final Serializer<K> keySerializer) {
+        Objects.requireNonNull(keySerializer, "keySerializer can't be null");
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        Objects.requireNonNull(key, "key can't be null");
+
+        final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
+        if (sourceTopicsInfo == null) {
+            return null;
+        }
+        return getStreamsMetadataForKey(storeName,
+                                        key,
+                                        new DefaultStreamPartitioner<>(keySerializer,
+                                                                       clusterMetadata,
+                                                                       sourceTopicsInfo.topicWithMostPartitions),
+                                        sourceTopicsInfo);
+    }
+
+
+
+
+
+    /**
+     * Find the {@link StreamsMetadata}s for a given storeName and key.
+     *
+     * Note: the key may not exist in the {@link StateStore},
+     * this method provides a way of finding which {@link StreamsMetadata} it would exist on.
+     *
+     * @param storeName   Name of the store
+     * @param key         Key to use
+     * @param partitioner partitioner to use to find correct partition for key
+     * @param <K>         key type
+     * @return The {@link StreamsMetadata} for the storeName and key
+     */
+    public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName,
+                                                               final K key,
+                                                               final StreamPartitioner<K, ?> partitioner) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        Objects.requireNonNull(key, "key can't be null");
+        Objects.requireNonNull(partitioner, "partitioner can't be null");
+
+        SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
+        if (sourceTopicsInfo == null) {
+            return null;
+        }
+        return getStreamsMetadataForKey(storeName, key, partitioner, sourceTopicsInfo);
+    }
+
+    /**
+     * Respond to changes to the HostInfo -> TopicPartition mapping. Will rebuild the
+     * metadata
+     * @param currentState  the current mapping of {@link HostInfo} -> {@link TopicPartition}s
+     * @param clusterMetadata    the current clusterMetadata {@link Cluster}
+     */
+    public synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> currentState, final Cluster clusterMetadata) {
+        this.clusterMetadata = clusterMetadata;
+        rebuildMetadata(currentState);
+    }
+
+    private boolean hasPartitionsForAnyTopics(final Set<String> topicNames, final Set<TopicPartition> partitionForHost) {
+        for (TopicPartition topicPartition : partitionForHost) {
+            if (topicNames.contains(topicPartition.topic())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> currentState) {
+        allMetadata.clear();
+        if (currentState.isEmpty()) {
+            return;
+        }
+        final Map<String, Set<String>> stores = builder.stateStoreNameToSourceTopics();
+        for (Map.Entry<HostInfo, Set<TopicPartition>> entry : currentState.entrySet()) {
+            final HostInfo key = entry.getKey();
+            final Set<TopicPartition> partitionsForHost = new HashSet<>(entry.getValue());
+            final Set<String> storesOnHost = new HashSet<>();
+            for (Map.Entry<String, Set<String>> storeTopicEntry : stores.entrySet()) {
+                final Set<String> topicsForStore = storeTopicEntry.getValue();
+                if (hasPartitionsForAnyTopics(topicsForStore, partitionsForHost)) {
+                    storesOnHost.add(storeTopicEntry.getKey());
+                }
+            }
+            allMetadata.add(new StreamsMetadata(key, storesOnHost, partitionsForHost));
+        }
+    }
+
+    private <K> StreamsMetadata getStreamsMetadataForKey(final String storeName,
+                                                         final K key,
+                                                         final StreamPartitioner<K, ?> partitioner,
+                                                         final SourceTopicsInfo sourceTopicsInfo) {
+
+        final Integer partition = partitioner.partition(key, null, sourceTopicsInfo.maxPartitions);
+        final Set<TopicPartition> matchingPartitions = new HashSet<>();
+        for (String sourceTopic : sourceTopicsInfo.sourceTopics) {
+            matchingPartitions.add(new TopicPartition(sourceTopic, partition));
+        }
+
+        for (StreamsMetadata streamsMetadata : allMetadata) {
+            final Set<String> stateStoreNames = streamsMetadata.stateStoreNames();
+            final Set<TopicPartition> topicPartitions = new HashSet<>(streamsMetadata.topicPartitions());
+            topicPartitions.retainAll(matchingPartitions);
+            if (stateStoreNames.contains(storeName)
+                    && !topicPartitions.isEmpty()) {
+                return streamsMetadata;
+            }
+        }
+        return null;
+    }
+
+    private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
+        final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName);
+        if (sourceTopics == null || sourceTopics.isEmpty()) {
+            return null;
+        }
+        return new SourceTopicsInfo(sourceTopics);
+    }
+
+    private class SourceTopicsInfo {
+        private final Set<String> sourceTopics;
+        private int maxPartitions;
+        private String topicWithMostPartitions;
+
+        private SourceTopicsInfo(final Set<String> sourceTopics) {
+            this.sourceTopics = sourceTopics;
+            for (String topic : sourceTopics) {
+                final List<PartitionInfo> partitions = clusterMetadata.partitionsForTopic(topic);
+                if (partitions != null && partitions.size() > maxPartitions) {
+                    maxPartitions = partitions.size();
+                    topicWithMostPartitions = partitions.get(0).topic();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 0486e57..6569f85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@ import org.apache.kafka.common.record.ByteBufferInputStream;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,19 +40,28 @@ import java.util.Set;
 public class AssignmentInfo {
 
     private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
-
+    /**
+     * A new field was added, partitionsByHostState. CURRENT_VERSION
+     * is required so we can decode the previous version. For example, this may occur
+     * during a rolling upgrade
+     */
+    private static final int CURRENT_VERSION = 2;
     public final int version;
     public final List<TaskId> activeTasks; // each element corresponds to a partition
     public final Map<TaskId, Set<TopicPartition>> standbyTasks;
+    public final Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
 
-    public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
-        this(1, activeTasks, standbyTasks);
+    public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+                          Map<HostInfo, Set<TopicPartition>> hostState) {
+        this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
     }
 
-    protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
+    protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+                             Map<HostInfo, Set<TopicPartition>> hostState) {
         this.version = version;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
+        this.partitionsByHostState = hostState;
     }
 
     /**
@@ -63,43 +73,48 @@ public class AssignmentInfo {
         DataOutputStream out = new DataOutputStream(baos);
 
         try {
-            if (version == 1) {
-                // Encode version
-                out.writeInt(1);
-                // Encode active tasks
-                out.writeInt(activeTasks.size());
-                for (TaskId id : activeTasks) {
-                    id.writeTo(out);
-                }
-                // Encode standby tasks
-                out.writeInt(standbyTasks.size());
-                for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
-                    TaskId id = entry.getKey();
-                    id.writeTo(out);
-
-                    Set<TopicPartition> partitions = entry.getValue();
-                    out.writeInt(partitions.size());
-                    for (TopicPartition partition : partitions) {
-                        out.writeUTF(partition.topic());
-                        out.writeInt(partition.partition());
-                    }
-                }
-
-                out.flush();
-                out.close();
+            // Encode version
+            out.writeInt(version);
+            // Encode active tasks
+            out.writeInt(activeTasks.size());
+            for (TaskId id : activeTasks) {
+                id.writeTo(out);
+            }
+            // Encode standby tasks
+            out.writeInt(standbyTasks.size());
+            for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
+                TaskId id = entry.getKey();
+                id.writeTo(out);
+
+                Set<TopicPartition> partitions = entry.getValue();
+                writeTopicPartitions(out, partitions);
+            }
+            out.writeInt(partitionsByHostState.size());
+            for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHostState
+                    .entrySet()) {
+                final HostInfo hostInfo = entry.getKey();
+                out.writeUTF(hostInfo.host());
+                out.writeInt(hostInfo.port());
+                writeTopicPartitions(out, entry.getValue());
+            }
 
-                return ByteBuffer.wrap(baos.toByteArray());
+            out.flush();
+            out.close();
 
-            } else {
-                TaskAssignmentException ex = new TaskAssignmentException("Unable to encode assignment data: version=" + version);
-                log.error(ex.getMessage(), ex);
-                throw ex;
-            }
+            return ByteBuffer.wrap(baos.toByteArray());
         } catch (IOException ex) {
             throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
         }
     }
 
+    private void writeTopicPartitions(DataOutputStream out, Set<TopicPartition> partitions) throws IOException {
+        out.writeInt(partitions.size());
+        for (TopicPartition partition : partitions) {
+            out.writeUTF(partition.topic());
+            out.writeInt(partition.partition());
+        }
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
      */
@@ -111,42 +126,55 @@ public class AssignmentInfo {
         try {
             // Decode version
             int version = in.readInt();
-            if (version == 1) {
-                // Decode active tasks
-                int count = in.readInt();
-                List<TaskId> activeTasks = new ArrayList<>(count);
-                for (int i = 0; i < count; i++) {
-                    activeTasks.add(TaskId.readFrom(in));
-                }
-                // Decode standby tasks
-                count = in.readInt();
-                Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
-                for (int i = 0; i < count; i++) {
-                    TaskId id = TaskId.readFrom(in);
-
-                    int numPartitions = in.readInt();
-                    Set<TopicPartition> partitions = new HashSet<>(numPartitions);
-                    for (int j = 0; j < numPartitions; j++) {
-                        partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
-                    }
-                    standbyTasks.put(id, partitions);
-                }
-
-                return new AssignmentInfo(activeTasks, standbyTasks);
-
-            } else {
+            if (version < 0 || version > CURRENT_VERSION) {
                 TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version);
                 log.error(ex.getMessage(), ex);
                 throw ex;
             }
+
+            // Decode active tasks
+            int count = in.readInt();
+            List<TaskId> activeTasks = new ArrayList<>(count);
+            for (int i = 0; i < count; i++) {
+                activeTasks.add(TaskId.readFrom(in));
+            }
+            // Decode standby tasks
+            count = in.readInt();
+            Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
+            for (int i = 0; i < count; i++) {
+                TaskId id = TaskId.readFrom(in);
+                standbyTasks.put(id, readTopicPartitions(in));
+            }
+
+            Map<HostInfo, Set<TopicPartition>> hostStateToTopicPartitions = new HashMap<>();
+            if (version == CURRENT_VERSION) {
+                int numEntries = in.readInt();
+                for (int i = 0; i < numEntries; i++) {
+                    HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
+                    hostStateToTopicPartitions.put(hostInfo, readTopicPartitions(in));
+                }
+            }
+
+            return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
+
+
         } catch (IOException ex) {
             throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
         }
     }
 
+    private static Set<TopicPartition> readTopicPartitions(DataInputStream in) throws IOException {
+        int numPartitions = in.readInt();
+        Set<TopicPartition> partitions = new HashSet<>(numPartitions);
+        for (int j = 0; j < numPartitions; j++) {
+            partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
+        }
+        return partitions;
+    }
+
     @Override
     public int hashCode() {
-        return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode();
+        return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHostState.hashCode();
     }
 
     @Override
@@ -155,7 +183,8 @@ public class AssignmentInfo {
             AssignmentInfo other = (AssignmentInfo) o;
             return this.version == other.version &&
                     this.activeTasks.equals(other.activeTasks) &&
-                    this.standbyTasks.equals(other.standbyTasks);
+                    this.standbyTasks.equals(other.standbyTasks) &&
+                    this.partitionsByHostState.equals(other.partitionsByHostState);
         } else {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 874fea8..c3481c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -31,54 +32,59 @@ public class SubscriptionInfo {
 
     private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    private static final int CURRENT_VERSION = 1;
+    private static final int CURRENT_VERSION = 2;
 
     public final int version;
     public final UUID processId;
     public final Set<TaskId> prevTasks;
     public final Set<TaskId> standbyTasks;
+    public final String userEndPoint;
 
-    public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
-        this(CURRENT_VERSION, processId, prevTasks, standbyTasks);
+    public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
+        this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
     }
 
-    private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
+    private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
         this.version = version;
         this.processId = processId;
         this.prevTasks = prevTasks;
         this.standbyTasks = standbyTasks;
+        this.userEndPoint = userEndPoint;
     }
 
     /**
      * @throws TaskAssignmentException if method fails to encode the data
      */
     public ByteBuffer encode() {
-        if (version == CURRENT_VERSION) {
-            ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
-            // version
-            buf.putInt(version);
-            // encode client UUID
-            buf.putLong(processId.getMostSignificantBits());
-            buf.putLong(processId.getLeastSignificantBits());
-            // encode ids of previously running tasks
-            buf.putInt(prevTasks.size());
-            for (TaskId id : prevTasks) {
-                id.writeTo(buf);
-            }
-            // encode ids of cached tasks
-            buf.putInt(standbyTasks.size());
-            for (TaskId id : standbyTasks) {
-                id.writeTo(buf);
-            }
-            buf.rewind();
-
-            return buf;
-
+        byte[] endPointBytes;
+        if (userEndPoint == null) {
+            endPointBytes = new byte[0];
         } else {
-            TaskAssignmentException ex = new TaskAssignmentException("unable to encode subscription data: version=" + version);
-            log.error(ex.getMessage(), ex);
-            throw ex;
+            endPointBytes = userEndPoint.getBytes(Charset.forName("UTF-8"));
         }
+        ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 +
+                prevTasks.size() * 8 + 4 + standbyTasks.size() * 8
+                + 4 /* length of bytes */ + endPointBytes.length
+        );
+        // version
+        buf.putInt(version);
+        // encode client UUID
+        buf.putLong(processId.getMostSignificantBits());
+        buf.putLong(processId.getLeastSignificantBits());
+        // encode ids of previously running tasks
+        buf.putInt(prevTasks.size());
+        for (TaskId id : prevTasks) {
+            id.writeTo(buf);
+        }
+        // encode ids of cached tasks
+        buf.putInt(standbyTasks.size());
+        for (TaskId id : standbyTasks) {
+            id.writeTo(buf);
+        }
+        buf.putInt(endPointBytes.length);
+        buf.put(endPointBytes);
+        buf.rewind();
+        return buf;
     }
 
     /**
@@ -90,7 +96,7 @@ public class SubscriptionInfo {
 
         // Decode version
         int version = data.getInt();
-        if (version == CURRENT_VERSION) {
+        if (version == CURRENT_VERSION || version == 1) {
             // Decode client UUID
             UUID processId = new UUID(data.getLong(), data.getLong());
             // Decode previously active tasks
@@ -107,7 +113,17 @@ public class SubscriptionInfo {
                 standbyTasks.add(TaskId.readFrom(data));
             }
 
-            return new SubscriptionInfo(version, processId, prevTasks, standbyTasks);
+            String userEndPoint = null;
+            if (version == CURRENT_VERSION) {
+                int bytesLength = data.getInt();
+                if (bytesLength != 0) {
+                    byte[] bytes = new byte[bytesLength];
+                    data.get(bytes);
+                    userEndPoint = new String(bytes, Charset.forName("UTF-8"));
+                }
+
+            }
+            return new SubscriptionInfo(version, processId, prevTasks, standbyTasks, userEndPoint);
 
         } else {
             TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version);
@@ -118,7 +134,11 @@ public class SubscriptionInfo {
 
     @Override
     public int hashCode() {
-        return version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+        int hashCode = version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+        if (userEndPoint == null) {
+            return hashCode;
+        }
+        return hashCode ^ userEndPoint.hashCode();
     }
 
     @Override
@@ -128,7 +148,8 @@ public class SubscriptionInfo {
             return this.version == other.version &&
                     this.processId.equals(other.processId) &&
                     this.prevTasks.equals(other.prevTasks) &&
-                    this.standbyTasks.equals(other.standbyTasks);
+                    this.standbyTasks.equals(other.standbyTasks) &&
+                    this.userEndPoint != null ? this.userEndPoint.equals(other.userEndPoint) : other.userEndPoint == null;
         } else {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
new file mode 100644
index 0000000..37a15e1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+/**
+ * Represents a user defined endpoint in a {@link org.apache.kafka.streams.KafkaStreams} application.
+ * Instances of this class can be obtained by calling one of:
+ *  {@link KafkaStreams#allMetadata()}
+ *  {@link KafkaStreams#allMetadataForStore(String)}
+ *  {@link KafkaStreams#metadataForKey(String, Object, StreamPartitioner)}
+ *  {@link KafkaStreams#metadataForKey(String, Object, Serializer)}
+ *
+ *  The HostInfo is constructed during Partition Assignment
+ *  see {@link org.apache.kafka.streams.processor.internals.StreamPartitionAssignor}
+ *  It is extracted from the config {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG}
+ *
+ *  If developers wish to expose an endpoint in their KafkaStreams applications they should provide the above
+ *  config.
+ */
+public class HostInfo {
+    private final String host;
+    private final int port;
+
+    public HostInfo(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        HostInfo hostInfo = (HostInfo) o;
+
+        if (port != hostInfo.port) return false;
+        return host.equals(hostInfo.host);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = host.hashCode();
+        result = 31 * result + port;
+        return result;
+    }
+
+    public String host() {
+        return host;
+    }
+
+    public int port() {
+        return port;
+    }
+
+    @Override
+    public String toString() {
+        return "HostInfo{" +
+                "host='" + host + '\'' +
+                ", port=" + port +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
new file mode 100644
index 0000000..541221f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} application.
+ * It contains the user supplied {@link HostInfo} that can be used by developers to build
+ * APIs and services to connect to other instances, the Set of state stores available on
+ * the instance and the Set of {@link TopicPartition}s available on the instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadata {
+    private final HostInfo hostInfo;
+    private final Set<String> stateStoreNames;
+    private final Set<TopicPartition> topicPartitions;
+
+    public StreamsMetadata(final HostInfo hostInfo,
+                           final Set<String> stateStoreNames,
+                           final Set<TopicPartition> topicPartitions) {
+
+        this.hostInfo = hostInfo;
+        this.stateStoreNames = stateStoreNames;
+        this.topicPartitions = topicPartitions;
+    }
+
+    public HostInfo hostInfo() {
+        return hostInfo;
+    }
+
+    public Set<String> stateStoreNames() {
+        return stateStoreNames;
+    }
+
+    public Set<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    public String host() {
+        return hostInfo.host();
+    }
+    public int port() {
+        return hostInfo.port();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        final StreamsMetadata that = (StreamsMetadata) o;
+
+        if (!hostInfo.equals(that.hostInfo)) return false;
+        if (!stateStoreNames.equals(that.stateStoreNames)) return false;
+        return topicPartitions.equals(that.topicPartitions);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = hostInfo.hashCode();
+        result = 31 * result + stateStoreNames.hashCode();
+        result = 31 * result + topicPartitions.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "StreamsMetadata{" +
+                "hostInfo=" + hostInfo +
+                ", stateStoreNames=" + stateStoreNames +
+                ", topicPartitions=" + topicPartitions +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 29b8b03..b15c2ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
@@ -34,7 +36,7 @@ import static org.junit.Assert.assertTrue;
 public class KafkaStreamsTest {
 
     // We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete
-    // quick enough
+    // quick enough)
     @ClassRule
     public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
 
@@ -118,6 +120,45 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
+        KafkaStreams streams = createKafkaStreams();
+        streams.allMetadata();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
+        KafkaStreams streams = createKafkaStreams();
+        streams.allMetadataForStore("store");
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
+        KafkaStreams streams = createKafkaStreams();
+        streams.metadataForKey("store", "key", Serdes.String().serializer());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
+        KafkaStreams streams = createKafkaStreams();
+        streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
+            @Override
+            public Integer partition(final String key, final Object value, final int numPartitions) {
+                return 0;
+            }
+        });
+    }
+
+
+    private KafkaStreams createKafkaStreams() {
+        Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        KStreamBuilder builder = new KStreamBuilder();
+        return new KafkaStreams(builder, props);
+    }
+
     @Test
     public void testCleanup() throws Exception {
         final Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 5a30af5..0892893 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
@@ -338,7 +339,7 @@ public class RegexSourceIntegrationTest {
         private int index =  0;
 
         public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
-            super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time);
+            super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index c776b8a..b951743 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -26,6 +26,9 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.After;
 import org.junit.Test;
 
+import java.util.Map;
+import java.util.Set;
+
 import static org.junit.Assert.assertEquals;
 
 public class KStreamBuilderTest {
@@ -89,6 +92,54 @@ public class KStreamBuilderTest {
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
     }
 
+    @Test
+    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Exception {
+        final String topic1 = "topic-1";
+        final String topic2 = "topic-2";
+        final String topic3 = "topic-3";
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, String> source1 = builder.stream(topic1);
+        final KStream<String, String> source2 = builder.stream(topic2);
+        final KStream<String, String> source3 = builder.stream(topic3);
+
+        final KStream<String, String> merged = builder.merge(source1, source2, source3);
+        merged.groupByKey().count("my-table");
+        final Map<String, Set<String>> actual = builder.stateStoreNameToSourceTopics();
+        assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
+    }
+
+    @Test
+    public void shouldHaveCorrectSourceTopicsForTableFromMergedStreamWithProcessors() throws Exception {
+        final String topic1 = "topic-1";
+        final String topic2 = "topic-2";
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, String> source1 = builder.stream(topic1);
+        final KStream<String, String> source2 = builder.stream(topic2);
+        final KStream<String, String> processedSource1 =
+                source1.mapValues(new ValueMapper<String, String>() {
+                    @Override
+                    public String apply(final String value) {
+                        return value;
+                    }
+                }).filter(new Predicate<String, String>() {
+                    @Override
+                    public boolean test(final String key, final String value) {
+                        return true;
+                    }
+                });
+        final KStream<String, String> processedSource2 = source2.filter(new Predicate<String, String>() {
+            @Override
+            public boolean test(final String key, final String value) {
+                return true;
+            }
+        });
+
+        final KStream<String, String> merged = builder.merge(processedSource1, processedSource2);
+        merged.groupByKey().count("my-table");
+        final Map<String, Set<String>> actual = builder.stateStoreNameToSourceTopics();
+        assertEquals(Utils.mkSet("topic-1", "topic-2"), actual.get("my-table"));
+    }
+
     @Test(expected = TopologyBuilderException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
         new KStreamBuilder().stream();

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index f6ca6db..6f047b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -404,4 +404,39 @@ public class TopologyBuilderTest {
         return nodeNames;
     }
 
+    @Test
+    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source", "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), true, "processor");
+        final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+        assertEquals(1, stateStoreNameToSourceTopic.size());
+        assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store"));
+    }
+
+    @Test
+    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source", "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), false, "processor");
+        final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+        assertEquals(1, stateStoreNameToSourceTopic.size());
+        assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store"));
+    }
+
+    @Test
+    public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId("appId");
+        builder.addInternalTopic("internal-topic");
+        builder.addSource("source", "internal-topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), true, "processor");
+        final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+        assertEquals(1, stateStoreNameToSourceTopic.size());
+        assertEquals(Collections.singleton("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
+    }
+
 }


Mime
View raw message