kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: HOTFIX: WindowedStreamPartitioner does not provide topic name to serializer
Date Wed, 05 Apr 2017 09:31:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 49f80b236 -> aea146511


HOTFIX: WindowedStreamPartitioner does not provide topic name to serializer

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Eno Thereska <eno@confluent.io>, Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2777 from mjsax/hotfix-window-serdes-trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aea14651
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aea14651
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aea14651

Branch: refs/heads/trunk
Commit: aea14651184476940c69238535de5143e61f4c31
Parents: 49f80b2
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Apr 5 10:31:06 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Apr 5 10:31:06 2017 +0100

----------------------------------------------------------------------
 .../streams/kstream/internals/KStreamImpl.java  | 12 ++---
 .../kstream/internals/SessionKeySerde.java      | 17 +++---
 .../internals/WindowedStreamPartitioner.java    |  8 +--
 .../apache/kafka/streams/state/StateSerdes.java | 57 +++++++++++---------
 .../state/internals/CachingKeyValueStore.java   |  8 +--
 .../state/internals/CachingSessionStore.java    | 23 ++++----
 .../state/internals/CachingWindowStore.java     | 14 ++---
 .../ChangeLoggingKeyValueBytesStore.java        |  9 +++-
 .../internals/ChangeLoggingKeyValueStore.java   |  7 +--
 .../ChangeLoggingSegmentedBytesStore.java       |  9 +++-
 .../internals/InMemoryKeyValueLoggedStore.java  | 10 ++--
 .../state/internals/InMemoryKeyValueStore.java  |  8 +--
 .../streams/state/internals/MemoryLRUCache.java |  8 +--
 .../MergedSortedCacheSessionStoreIterator.java  |  4 +-
 .../internals/RocksDBSegmentedBytesStore.java   |  3 ++
 .../state/internals/RocksDBSessionStore.java    | 16 ++++--
 .../internals/RocksDBSessionStoreSupplier.java  |  6 +--
 .../streams/state/internals/RocksDBStore.java   |  8 +--
 .../state/internals/RocksDBWindowStore.java     |  7 +--
 .../state/internals/SegmentedBytesStore.java    |  8 +++
 .../state/internals/SessionKeySchema.java       | 10 +++-
 .../state/internals/WindowKeySchema.java        |  7 ++-
 .../state/internals/WindowStoreUtils.java       |  5 +-
 .../internals/WrappedSessionStoreIterator.java  |  4 +-
 .../kstream/internals/SessionKeySerdeTest.java  | 16 +++---
 .../WindowedStreamPartitionerTest.java          |  2 +-
 .../streams/state/KeyValueStoreTestDriver.java  | 34 ++++++------
 .../internals/CachingSessionStoreTest.java      |  6 ++-
 .../state/internals/CachingWindowStoreTest.java |  2 +-
 .../CompositeReadOnlyKeyValueStoreTest.java     |  2 +-
 ...gedSortedCacheKeyValueStoreIteratorTest.java |  2 +-
 ...tedCacheWrappedSessionStoreIteratorTest.java |  2 +-
 .../RocksDBSegmentedBytesStoreTest.java         |  9 ++--
 .../internals/RocksDBSessionStoreTest.java      |  5 +-
 .../state/internals/SessionKeySchemaTest.java   |  1 +
 .../state/internals/StateStoreTestUtils.java    | 16 ++++--
 .../state/internals/StoreChangeLoggerTest.java  |  6 +--
 .../internals/WrappingStoreProviderTest.java    |  4 +-
 38 files changed, 229 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 5eabc2c..bbd4ac4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -360,16 +360,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @SuppressWarnings("unchecked")
     @Override
-    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) {
+    public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) {
         Objects.requireNonNull(topic, "topic can't be null");
-        String name = topology.newName(SINK_NAME);
+        final String name = topology.newName(SINK_NAME);
 
-        Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
-        Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
+        final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
+        final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
 
         if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
-            WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
-            partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
+            final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
+            partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
         }
 
         topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index 7a85c77..3b57d95 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -33,7 +33,6 @@ import java.util.Map;
  */
 public class SessionKeySerde<K> implements Serde<Windowed<K>> {
     private static final int TIMESTAMP_SIZE = 8;
-    private static final String SESSIONKEY = "sessionkey";
 
     private final Serde<K> keySerde;
 
@@ -77,7 +76,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
             if (data == null) {
                 return null;
             }
-            return toBinary(data, keySerializer).get();
+            return toBinary(data, keySerializer, topic).get();
         }
 
         @Override
@@ -102,7 +101,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
             if (data == null || data.length == 0) {
                 return null;
             }
-            return from(data, deserializer);
+            return from(data, deserializer, topic);
         }
 
 
@@ -133,8 +132,8 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
         return bytes;
     }
 
-    public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer) {
-        final K key = extractKey(binaryKey, keyDeserializer);
+    public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer, final String topic) {
+        final K key = extractKey(binaryKey, keyDeserializer, topic);
         final Window window = extractWindow(binaryKey);
         return new Windowed<>(key, window);
     }
@@ -147,12 +146,12 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
         return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), new SessionWindow(start, end));
     }
 
-    private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer) {
-        return deserializer.deserialize(SESSIONKEY, extractKeyBytes(binaryKey));
+    private static <K> K extractKey(final byte[] binaryKey, final Deserializer<K> deserializer, final String topic) {
+        return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
     }
 
-    public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer) {
-        final byte[] bytes = serializer.serialize(SESSIONKEY, sessionKey.key());
+    public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer, final String topic) {
+        final byte[] bytes = serializer.serialize(topic, sessionKey.key());
         ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
         buf.put(bytes);
         buf.putLong(sessionKey.window().end());

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index f4a5e81..fa1ceae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -24,9 +24,11 @@ import static org.apache.kafka.common.utils.Utils.toPositive;
 
 public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
 
+    private final String topic;
     private final WindowedSerializer<K> serializer;
 
-    public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
+    WindowedStreamPartitioner(final String topic, final WindowedSerializer<K> serializer) {
+        this.topic = topic;
         this.serializer = serializer;
     }
 
@@ -40,8 +42,8 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
      * @param numPartitions the total number of partitions
      * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
      */
-    public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
-        byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
+    public Integer partition(final Windowed<K> windowedKey, final V value, final int numPartitions) {
+        final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);
 
         // hash the keyBytes to choose a partition
         return toPositive(Utils.murmur2(keyBytes)) % numPartitions;

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index 4366311..d43c613 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -32,18 +32,21 @@ public final class StateSerdes<K, V> {
     /**
      * Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes.
      *
-     * @param stateName   the name of the state
-     * @param keyClass    the class of the key type
-     * @param valueClass  the class of the value type
-     * @param <K>         the key type
-     * @param <V>         the value type
-     * @return            a new instance of {@link StateSerdes}
+     * @param topic      the topic name
+     * @param keyClass   the class of the key type
+     * @param valueClass the class of the value type
+     * @param <K>        the key type
+     * @param <V>        the value type
+     * @return a new instance of {@link StateSerdes}
      */
-    public static <K, V> StateSerdes<K, V> withBuiltinTypes(String stateName, Class<K> keyClass, Class<V> valueClass) {
-        return new StateSerdes<>(stateName, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
+    public static <K, V> StateSerdes<K, V> withBuiltinTypes(
+        final String topic,
+        final Class<K> keyClass,
+        final Class<V> valueClass) {
+        return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
     }
 
-    private final String stateName;
+    private final String topic;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
 
@@ -53,22 +56,26 @@ public final class StateSerdes<K, V> {
      * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not
      * need to provide the topic name any more.
      *
-     * @param stateName     the name of the state
+     * @param topic         the topic name
      * @param keySerde      the serde for keys; cannot be null
      * @param valueSerde    the serde for values; cannot be null
      * @throws IllegalArgumentException if key or value serde is null
      */
     @SuppressWarnings("unchecked")
-    public StateSerdes(String stateName,
-                       Serde<K> keySerde,
-                       Serde<V> valueSerde) {
-        this.stateName = stateName;
-
-        if (keySerde == null)
+    public StateSerdes(final String topic,
+                       final Serde<K> keySerde,
+                       final Serde<V> valueSerde) {
+        if (topic == null) {
+            throw new IllegalArgumentException("topic cannot be null");
+        }
+        if (keySerde == null) {
             throw new IllegalArgumentException("key serde cannot be null");
-        if (valueSerde == null)
+        }
+        if (valueSerde == null) {
             throw new IllegalArgumentException("value serde cannot be null");
+        }
 
+        this.topic = topic;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
     }
@@ -128,12 +135,12 @@ public final class StateSerdes<K, V> {
     }
 
     /**
-     * Return the name of the state.
+     * Return the topic.
      *
-     * @return the name of the state
+     * @return the topic
      */
-    public String stateName() {
-        return stateName;
+    public String topic() {
+        return topic;
     }
 
     /**
@@ -143,7 +150,7 @@ public final class StateSerdes<K, V> {
      * @return        the key as typed object
      */
     public K keyFrom(byte[] rawKey) {
-        return keySerde.deserializer().deserialize(stateName, rawKey);
+        return keySerde.deserializer().deserialize(topic, rawKey);
     }
 
     /**
@@ -153,7 +160,7 @@ public final class StateSerdes<K, V> {
      * @return          the value as typed object
      */
     public V valueFrom(byte[] rawValue) {
-        return valueSerde.deserializer().deserialize(stateName, rawValue);
+        return valueSerde.deserializer().deserialize(topic, rawValue);
     }
 
     /**
@@ -163,7 +170,7 @@ public final class StateSerdes<K, V> {
      * @return     the serialized key
      */
     public byte[] rawKey(K key) {
-        return keySerde.serializer().serialize(stateName, key);
+        return keySerde.serializer().serialize(topic, key);
     }
 
     /**
@@ -173,6 +180,6 @@ public final class StateSerdes<K, V> {
      * @return       the serialized value
      */
     public byte[] rawValue(V value) {
-        return valueSerde.serializer().serialize(stateName, value);
+        return valueSerde.serializer().serialize(topic, value);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index d9ef688..2a720be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -51,11 +52,6 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
         this.valueSerde = valueSerde;
     }
 
-    @Override
-    public String name() {
-        return underlying.name();
-    }
-
     @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
@@ -69,7 +65,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     @SuppressWarnings("unchecked")
     private void initInternal(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
-        this.serdes = new StateSerdes<>(underlying.name(),
+        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()),
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index a4b46ff..bebd118 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -43,6 +44,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
     private StateSerdes<K, AGG> serdes;
     private InternalProcessorContext context;
     private CacheFlushListener<Windowed<K>, AGG> flushListener;
+    private String topic;
 
     CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
                         final Serde<K> keySerde,
@@ -56,6 +58,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
 
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
+        topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
         bytesStore.init(context, root);
         initInternal((InternalProcessorContext) context);
     }
@@ -64,13 +67,15 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
     private void initInternal(final InternalProcessorContext context) {
         this.context = context;
 
-        this.serdes = new StateSerdes<>(bytesStore.name(),
-                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
+        keySchema.init(topic);
+        serdes = new StateSerdes<>(
+            topic,
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
 
 
-        this.cacheName = context.taskId() + "-" + bytesStore.name();
-        this.cache = this.context.getCache();
+        cacheName = context.taskId() + "-" + bytesStore.name();
+        cache = context.getCache();
         cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> entries) {
@@ -85,7 +90,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
                                                            final long earliestSessionEndTime,
                                                            final long latestSessionStartTime) {
         validateStoreOpen();
-        final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(this.name(), key));
+        final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(topic, key));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName,
                                                                                   keySchema.lowerRange(binarySessionId, earliestSessionEndTime),
                                                                                   keySchema.upperRange(binarySessionId, latestSessionStartTime));
@@ -106,7 +111,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
     @Override
     public void put(final Windowed<K> key, AGG value) {
         validateStoreOpen();
-        final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer());
+        final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer(), topic);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
                                                       key.window().end(), context.partition(), context.topic());
         cache.put(cacheName, binaryKey, entry);
@@ -122,12 +127,12 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
         final RecordContext current = context.recordContext();
         context.setRecordContext(entry.recordContext());
         try {
-            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer());
+            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic);
             if (flushListener != null) {
                 final AGG newValue = serdes.valueFrom(entry.newValue());
                 final AGG oldValue = fetchPrevious(binaryKey);
                 if (!(newValue == null && oldValue == null)) {
-                    flushListener.apply(key, newValue == null ? null : newValue, oldValue);
+                    flushListener.apply(key, newValue, oldValue);
                 }
             }
             bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), entry.newValue());

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 4003e54..f492573 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -62,17 +63,18 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
     public void init(final ProcessorContext context, final StateStore root) {
         underlying.init(context, root);
         initInternal(context);
+        keySchema.init(context.applicationId());
     }
 
     @SuppressWarnings("unchecked")
     private void initInternal(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
-        this.serdes = new StateSerdes<>(underlying.name(),
-                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                                        valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()),
+                                   keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                                   valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
-        this.name = context.taskId() + "-" + underlying.name();
-        this.cache = this.context.getCache();
+        name = context.taskId() + "-" + underlying.name();
+        cache = this.context.getCache();
 
         cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -161,7 +163,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
 
         return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator,
                                                           underlyingIterator,
-                                                          new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde()));
+                                                          new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde()));
     }
 
     private V fetchPrevious(final Bytes key, final long timestamp) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index f5ad3ac..8dc457a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -37,7 +38,13 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         inner.init(context, root);
-        this.changeLogger = new StoreChangeLogger<>(inner.name(), context, WindowStoreUtils.INNER_SERDES);
+        this.changeLogger = new StoreChangeLogger<>(
+            inner.name(),
+            context,
+            WindowStoreUtils.getInnerStateSerde(
+                ProcessorStateManager.storeChangelogTopic(
+                    context.applicationId(),
+                    inner.name())));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
index c60278f..ea9f7aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -55,9 +56,9 @@ class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateSt
     public void init(final ProcessorContext context, final StateStore root) {
         innerBytes.init(context, root);
 
-        this.serdes = new StateSerdes<>(innerBytes.name(),
-                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                                        valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), innerBytes.name()),
+                                   keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                                   valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
index 3082426..d23e115 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 /**
@@ -65,6 +66,12 @@ class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractStateSt
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
         bytesStore.init(context, root);
-        changeLogger = new StoreChangeLogger<>(name(), context, WindowStoreUtils.INNER_SERDES);
+        changeLogger = new StoreChangeLogger<>(
+            name(),
+            context,
+            WindowStoreUtils.getInnerStateSerde(
+                ProcessorStateManager.storeChangelogTopic(
+                    context.applicationId(),
+                    bytesStore.name())));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index bcc9819..638caad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -33,7 +34,6 @@ public class InMemoryKeyValueLoggedStore<K, V> extends WrappedStateStore.Abstrac
     private final Serde<V> valueSerde;
 
     private StoreChangeLogger<K, V> changeLogger;
-    private ProcessorContext context;
 
     InMemoryKeyValueLoggedStore(final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) {
         super(inner);
@@ -45,13 +45,13 @@ public class InMemoryKeyValueLoggedStore<K, V> extends WrappedStateStore.Abstrac
     @Override
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
-        this.context = context;
         inner.init(context, root);
 
         // construct the serde
-        StateSerdes<K, V>  serdes = new StateSerdes<>(inner.name(),
-                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        StateSerdes<K, V>  serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         this.changeLogger = new StoreChangeLogger<>(inner.name(), context, serdes);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index f63d2f1..41c6de3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -64,9 +65,10 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
         // construct the serde
-        this.serdes = new StateSerdes<>(name,
-                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        this.serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         if (root != null) {
             // register the store

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index cf78165..e6bba54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -103,9 +104,10 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
         // construct the serde
-        this.serdes = new StateSerdes<>(name,
-                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        this.serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         // register the store
         context.register(root, true, new StateRestoreCallback() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
index 72b73ed..3f9b620 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
@@ -36,7 +36,7 @@ class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSorted
     MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
                                           final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator,
                                           final StateSerdes<K, AGG> serdes) {
-        super(cacheIterator, storeIterator, new StateSerdes<>(serdes.stateName(),
+        super(cacheIterator, storeIterator, new StateSerdes<>(serdes.topic(),
                                                               new SessionKeySerde<>(serdes.keySerde()),
                                                               serdes.valueSerde()));
 
@@ -51,7 +51,7 @@ class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSorted
 
     @Override
     Windowed<K> deserializeCacheKey(final Bytes cacheKey) {
-        return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer());
+        return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer(), rawSerdes.topic());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index a9d9259..252a55f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.util.List;
@@ -89,6 +90,8 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     public void init(ProcessorContext context, StateStore root) {
         this.context = context;
 
+        keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()));
+
         segments.openExisting(context);
 
         // register and possibly restore the state from the logs

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index cdd0dd7..5027781 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -35,6 +36,7 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
     protected final SegmentedBytesStore bytesStore;
 
     protected StateSerdes<K, AGG> serdes;
+    protected String topic;
 
     // this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs
     private static class RocksDBSessionBytesStore extends RocksDBSessionStore<Bytes, byte[]> {
@@ -75,9 +77,13 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
     @Override
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
-        this.serdes = new StateSerdes<>(bytesStore.name(),
-                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
+        final String storeName = bytesStore.name();
+        topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+
+        serdes = new StateSerdes<>(
+            topic,
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
 
         bytesStore.init(context, root);
     }
@@ -95,11 +101,11 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
 
     @Override
     public void remove(final Windowed<K> key) {
-        bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer()));
+        bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer(), topic));
     }
 
     @Override
     public void put(final Windowed<K> sessionKey, final AGG aggregate) {
-        bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer()), aggSerde.serializer().serialize(bytesStore.name(), aggregate));
+        bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), aggSerde.serializer().serialize(topic, aggregate));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
index 6743a7e..4e618d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
@@ -51,9 +51,9 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K,
     public SessionStore<K, V> get() {
         final SessionKeySchema keySchema = new SessionKeySchema();
         final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
-                                                                                     retentionPeriod,
-                                                                                     NUM_SEGMENTS,
-                                                                                     keySchema);
+                                                                                    retentionPeriod,
+                                                                                    NUM_SEGMENTS,
+                                                                                    keySchema);
 
         if (cached && logged) {
             final ChangeLoggingSegmentedBytesStore logged = new ChangeLoggingSegmentedBytesStore(segmented);

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 932ddd2..c879b91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -143,9 +144,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
         // we need to construct the serde while opening DB since
         // it is also triggered by windowed DB segments without initialization
-        this.serdes = new StateSerdes<>(name,
-                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        this.serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index b82e416..5e8d0b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -77,9 +78,9 @@ class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
         // construct the serde
-        this.serdes = new StateSerdes<>(bytesStore.name(),
-                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                                        valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()),
+                                   keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                                   valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         bytesStore.init(context, root);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 622ed08..0c3bb53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -66,6 +66,14 @@ public interface SegmentedBytesStore extends StateStore {
     byte[] get(Bytes key);
 
     interface KeySchema {
+
+        /**
+         * Initialized the schema with a topic.
+         *
+         * @param topic a topic name
+         */
+        void init(final String topic);
+
         /**
          * Given a record-key and a time, construct a Segmented key that represents
          * the upper range of keys to search when performing range queries.

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 7d6761c..80785b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -27,17 +27,23 @@ import java.util.List;
 
 
 class SessionKeySchema implements SegmentedBytesStore.KeySchema {
+    private String topic;
+
+    @Override
+    public void init(final String topic) {
+        this.topic = topic;
+    }
 
     @Override
     public Bytes upperRange(final Bytes key, final long to) {
         final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE));
-        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer());
+        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic);
     }
 
     @Override
     public Bytes lowerRange(final Bytes key, final long from) {
         final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from)));
-        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer());
+        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 76faf0e..b9a8665 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -24,7 +24,12 @@ import org.apache.kafka.streams.state.StateSerdes;
 import java.util.List;
 
 class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
-    private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray());
+    private StateSerdes<Bytes, byte[]> serdes;
+
+    @Override
+    public void init(final String topic) {
+        serdes = new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray());
+    }
 
     @Override
     public Bytes upperRange(final Bytes key, final long to) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
index b93e39a..faf2899 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -31,7 +31,10 @@ public class WindowStoreUtils {
     /** Inner byte array serde used for segments */
     static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
     static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
-    static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE);
+
+    static StateSerdes<Bytes, byte[]> getInnerStateSerde(final String topic) {
+        return new StateSerdes<>(topic, INNER_KEY_SERDE, INNER_VALUE_SERDE);
+    }
 
     static <K> Bytes toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) {
         byte[] serializedKey = serdes.rawKey(key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
index 819f263..6fd9636 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
@@ -66,7 +66,7 @@ class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>,
     @Override
     public Windowed<K> peekNextKey() {
         final Bytes bytes = bytesIterator.peekNextKey();
-        return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer());
+        return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer(), serdes.topic());
     }
 
     @Override
@@ -77,7 +77,7 @@ class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>,
     @Override
     public KeyValue<Windowed<K>, V> next() {
         final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-        return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer()), serdes.valueFrom(next.value));
+        return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
index 65c4d9f..aca3352 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
@@ -62,39 +62,39 @@ public class SessionKeySerdeTest {
 
     @Test
     public void shouldConvertToBinaryAndBack() throws Exception {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
-        final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer());
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
+        final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer(), "dummy");
         assertEquals(windowedKey, result);
     }
 
     @Test
     public void shouldExtractEndTimeFromBinary() throws Exception {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
         assertEquals(endTime, SessionKeySerde.extractEnd(serialized.get()));
     }
 
     @Test
     public void shouldExtractStartTimeFromBinary() throws Exception {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
         assertEquals(startTime, SessionKeySerde.extractStart(serialized.get()));
     }
 
     @Test
     public void shouldExtractWindowFromBindary() throws Exception {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
         assertEquals(window, SessionKeySerde.extractWindow(serialized.get()));
     }
 
     @Test
     public void shouldExtractKeyBytesFromBinary() throws Exception {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
         assertArrayEquals(key.getBytes(), SessionKeySerde.extractKeyBytes(serialized.get()));
     }
 
     @Test
     public void shouldExtractKeyFromBinary() throws Exception {
-        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
-        assertEquals(windowedKey, SessionKeySerde.from(serialized.get(), serde.deserializer()));
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
+        assertEquals(windowedKey, SessionKeySerde.from(serialized.get(), serde.deserializer(), "dummy"));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 3af50d8..316494d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -69,7 +69,7 @@ public class WindowedStreamPartitionerTest {
         DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
 
         WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer);
-        WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
+        WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, windowedSerializer);
 
         for (int k = 0; k < 10; k++) {
             Integer key = rand.nextInt();

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 08adbf4..b758799 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -16,16 +16,6 @@
  */
 package org.apache.kafka.streams.state;
 
-import java.io.File;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -54,6 +44,17 @@ import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
 /**
  * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that
  * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes.
@@ -170,11 +171,12 @@ public class KeyValueStoreTestDriver<K, V> {
      * @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null
      * @return the test driver; never null
      */
-    public static <K, V> KeyValueStoreTestDriver<K, V> create(Serializer<K> keySerializer,
-                                                              Deserializer<K> keyDeserializer,
-                                                              Serializer<V> valueSerializer,
-                                                              Deserializer<V> valueDeserializer) {
-        StateSerdes<K, V> serdes = new StateSerdes<K, V>("unexpected",
+    public static <K, V> KeyValueStoreTestDriver<K, V> create(final Serializer<K> keySerializer,
+                                                              final Deserializer<K> keyDeserializer,
+                                                              final Serializer<V> valueSerializer,
+                                                              final Deserializer<V> valueDeserializer) {
+        StateSerdes<K, V> serdes = new StateSerdes<K, V>(
+            "unexpected",
             Serdes.serdeFrom(keySerializer, keyDeserializer),
             Serdes.serdeFrom(valueSerializer, valueDeserializer));
         return new KeyValueStoreTestDriver<K, V>(serdes);
@@ -234,7 +236,7 @@ public class KeyValueStoreTestDriver<K, V> {
         this.stateDir.mkdirs();
 
         props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationId");
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass());

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 59caaaf..7377ba2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -58,7 +58,9 @@ public class CachingSessionStoreTest {
 
     @Before
     public void setUp() throws Exception {
-        underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, new SessionKeySchema());
+        final SessionKeySchema schema = new SessionKeySchema();
+        schema.init("topic");
+        underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, schema);
         final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
         cachingStore = new CachingSessionStore<>(sessionStore,
                                                  Serdes.String(),
@@ -116,7 +118,7 @@ public class CachingSessionStoreTest {
         assertEquals(added.size() - 1, cache.size());
         final KeyValueIterator<Bytes, byte[]> iterator = underlying.fetch(Bytes.wrap(added.get(0).key.key().getBytes()), 0, 0);
         final KeyValue<Bytes, byte[]> next = iterator.next();
-        assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer()));
+        assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy"));
         assertArrayEquals(serdes.rawValue(added.get(0).value), next.value);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 297a88e..054e685 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -160,7 +160,7 @@ public class CachingWindowStoreTest {
     @Test
     public void shouldIterateCacheAndStore() throws Exception {
         final Bytes key = Bytes.wrap("1" .getBytes());
-        underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.INNER_SERDES), "a".getBytes());
+        underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("app-id")), "a".getBytes());
         cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE);
         final WindowStoreIterator<String> fetch = cachingStore.fetch("1", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE);
         assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index e19c4ef..2e5b872 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -61,7 +61,7 @@ public class CompositeReadOnlyKeyValueStoreTest {
     }
 
     private KeyValueStore<String, String> newStoreInstance() {
-        return StateStoreTestUtils.newKeyValueStore(storeName, String.class, String.class);
+        return StateStoreTestUtils.newKeyValueStore(storeName, "app-id", String.class, String.class);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index 3f05428..6e0059f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -32,7 +32,7 @@ import static org.junit.Assert.assertFalse;
 public class MergedSortedCacheKeyValueStoreIteratorTest {
 
     private final String namespace = "one";
-    private final StateSerdes<byte[], byte[]> serdes =  new StateSerdes<>(namespace, Serdes.ByteArray(), Serdes.ByteArray());
+    private final StateSerdes<byte[], byte[]> serdes =  new StateSerdes<>("dummy", Serdes.ByteArray(), Serdes.ByteArray());
     private KeyValueStore<Bytes, byte[]> store;
     private ThreadCache cache;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
index 5f24fde..d3d8f40 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
@@ -45,7 +45,7 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest {
     private final SessionWindow cacheWindow = new SessionWindow(10, 20);
     private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(KeyValue.pair(
             SessionKeySerde.toBinary(
-                    new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer()), new LRUCacheEntry(cacheKey.getBytes())))
+                    new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "dummy"), new LRUCacheEntry(cacheKey.getBytes())))
             .iterator();
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index ab13c24..bd335d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -54,11 +54,12 @@ public class RocksDBSegmentedBytesStoreTest {
 
     @Before
     public void before() {
-
+        final SessionKeySchema schema = new SessionKeySchema();
+        schema.init("topic");
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
                                                     retention,
                                                     numSegments,
-                                                    new SessionKeySchema());
+                                                    schema);
 
         stateDir = TestUtils.tempDirectory();
         final MockProcessorContext context = new MockProcessorContext(stateDir,
@@ -154,7 +155,7 @@ public class RocksDBSegmentedBytesStoreTest {
     }
 
     private Bytes serializeKey(final Windowed<String> key) {
-        return SessionKeySerde.toBinary(key, Serdes.String().serializer());
+        return SessionKeySerde.toBinary(key, Serdes.String().serializer(), "dummy");
     }
 
     private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) {
@@ -162,7 +163,7 @@ public class RocksDBSegmentedBytesStoreTest {
         while (iterator.hasNext()) {
             final KeyValue<Bytes, byte[]> next = iterator.next();
             final KeyValue<Windowed<String>, Long> deserialized
-                    = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer()), Serdes.Long().deserializer().deserialize("", next.value));
+                    = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy"), Serdes.Long().deserializer().deserialize("", next.value));
             results.add(deserialized);
         }
         return results;

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 9be7c10..7f01aae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -48,8 +48,11 @@ public class RocksDBSessionStoreTest {
 
     @Before
     public void before() {
+        final SessionKeySchema schema = new SessionKeySchema();
+        schema.init("topic");
+
         final RocksDBSegmentedBytesStore bytesStore =
-                new RocksDBSegmentedBytesStore("session-store", 10000L, 3, new SessionKeySchema());
+                new RocksDBSegmentedBytesStore("session-store", 10000L, 3, schema);
 
         sessionStore = new RocksDBSessionStore<>(bytesStore,
                                                  Serdes.String(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
index 7c085dd..354cf01 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -40,6 +40,7 @@ public class SessionKeySchemaTest {
 
     @Before
     public void before() {
+        sessionKeySchema.init("topic");
         final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1),
                                                                   KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2),
                                                                   KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3),

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
index 39cc5b5..d30372f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
@@ -28,7 +29,10 @@ import java.util.Collections;
 @SuppressWarnings("unchecked")
 public class StateStoreTestUtils {
 
-    public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, Class<K> keyType, Class<V> valueType) {
+    public static <K, V> KeyValueStore<K, V> newKeyValueStore(final String name,
+                                                              final String applicationId,
+                                                              final Class<K> keyType,
+                                                              final Class<V> valueType) {
         final InMemoryKeyValueStoreSupplier<K, V> supplier = new InMemoryKeyValueStoreSupplier<>(name,
                                                                                                  null,
                                                                                                  null,
@@ -37,8 +41,14 @@ public class StateStoreTestUtils {
                                                                                                  Collections.<String, String>emptyMap());
 
         final StateStore stateStore = supplier.get();
-        stateStore.init(new MockProcessorContext(StateSerdes.withBuiltinTypes(name, keyType, valueType),
-                new NoOpRecordCollector()), stateStore);
+        stateStore.init(
+            new MockProcessorContext(
+                StateSerdes.withBuiltinTypes(
+                    ProcessorStateManager.storeChangelogTopic(applicationId, name),
+                    keyType,
+                    valueType),
+                new NoOpRecordCollector()),
+            stateStore);
         return (KeyValueStore<K, V>) stateStore;
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 6edda74..311eaf6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -17,9 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
@@ -27,6 +24,9 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index 76a8747..9918672 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -42,9 +42,9 @@ public class WrappingStoreProviderTest {
         final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false);
 
 
-        stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class));
+        stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class));
         stubProviderOne.addStore("window", new NoOpWindowStore());
-        stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class));
+        stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class));
         stubProviderTwo.addStore("window", new NoOpWindowStore());
 
         wrappingStoreProvider = new WrappingStoreProvider(


Mime
View raw message