kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: HOTFIX: WindowedStreamPartitioner does not provide topic name to serializer
Date Wed, 05 Apr 2017 10:21:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 d64025911 -> 65c8f680e


HOTFIX: WindowedStreamPartitioner does not provide topic name to serializer

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Eno Thereska <eno@confluent.io>, Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2776 from mjsax/hotfix-window-serdes-0102


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65c8f680
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65c8f680
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65c8f680

Branch: refs/heads/0.10.2
Commit: 65c8f680ee4bf0efad82843736b1c63bc89c7d61
Parents: d640259
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Apr 5 11:21:20 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Apr 5 11:21:20 2017 +0100

----------------------------------------------------------------------
 .../streams/kstream/internals/KStreamImpl.java  | 12 ++--
 .../kstream/internals/SessionKeySerde.java      | 17 +++---
 .../internals/WindowedStreamPartitioner.java    |  8 ++-
 .../apache/kafka/streams/state/StateSerdes.java | 61 +++++++++++---------
 .../state/internals/CachingKeyValueStore.java   |  3 +-
 .../state/internals/CachingSessionStore.java    | 14 +++--
 .../state/internals/CachingWindowStore.java     |  7 ++-
 .../ChangeLoggingKeyValueBytesStore.java        |  9 ++-
 .../internals/ChangeLoggingKeyValueStore.java   |  3 +-
 .../ChangeLoggingSegmentedBytesStore.java       |  9 ++-
 .../internals/InMemoryKeyValueLoggedStore.java  | 10 ++--
 .../InMemoryKeyValueStoreSupplier.java          |  8 ++-
 .../streams/state/internals/MemoryLRUCache.java |  8 ++-
 .../MergedSortedCacheSessionStoreIterator.java  |  4 +-
 .../internals/RocksDBSegmentedBytesStore.java   |  3 +
 .../state/internals/RocksDBSessionStore.java    | 14 +++--
 .../streams/state/internals/RocksDBStore.java   |  8 ++-
 .../state/internals/RocksDBWindowStore.java     |  3 +-
 .../state/internals/SegmentedBytesStore.java    |  8 +++
 .../state/internals/SessionKeySchema.java       | 10 +++-
 .../state/internals/WindowStoreKeySchema.java   |  7 ++-
 .../state/internals/WindowStoreUtils.java       |  5 +-
 .../kstream/internals/SessionKeySerdeTest.java  | 10 ++--
 .../WindowedStreamPartitionerTest.java          |  2 +-
 .../streams/state/KeyValueStoreTestDriver.java  |  8 +--
 .../internals/CachingSessionStoreTest.java      |  2 +-
 .../state/internals/CachingWindowStoreTest.java |  2 +-
 .../CompositeReadOnlyKeyValueStoreTest.java     |  2 +-
 ...rgedSortedCacheSessionStoreIteratorTest.java |  2 +-
 .../RocksDBSegmentedBytesStoreTest.java         |  4 +-
 .../state/internals/StateStoreTestUtils.java    | 13 ++++-
 .../internals/WrappingStoreProviderTest.java    |  4 +-
 32 files changed, 176 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0434f06..7b9fd94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -349,16 +349,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @SuppressWarnings("unchecked")
     @Override
-    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) {
+    public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) {
         Objects.requireNonNull(topic, "topic can't be null");
-        String name = topology.newName(SINK_NAME);
+        final String name = topology.newName(SINK_NAME);
 
-        Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
-        Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
+        final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
+        final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
 
         if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
-            WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
-            partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
+            final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
+            partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
         }
 
         topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index 249350e..6909773 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -33,7 +33,6 @@ import java.util.Map;
  */
 public class SessionKeySerde<K> implements Serde<Windowed<K>> {
     private static final int TIMESTAMP_SIZE = 8;
-    private static final String SESSIONKEY = "sessionkey";
 
     private final Serde<K> keySerde;
 
@@ -77,7 +76,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
             if (data == null) {
                 return null;
             }
-            return toBinary(data, keySerializer).get();
+            return toBinary(data, keySerializer, topic).get();
         }
 
         @Override
@@ -102,7 +101,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
             if (data == null || data.length == 0) {
                 return null;
             }
-            return from(data, deserializer);
+            return from(data, deserializer, topic);
         }
 
 
@@ -127,20 +126,20 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
         return bytes;
     }
 
-    public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer) {
-        final K key = extractKey(binaryKey, keyDeserializer);
+    public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer, final String topic) {
+        final K key = extractKey(binaryKey, keyDeserializer, topic);
         final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
         final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
         final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
         return new Windowed<>(key, new SessionWindow(start, end));
     }
 
-    private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer) {
-        return deserializer.deserialize(SESSIONKEY, extractKeyBytes(binaryKey));
+    private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer, final String topic) {
+        return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
     }
 
-    public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer) {
-        final byte[] bytes = serializer.serialize(SESSIONKEY, sessionKey.key());
+    public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer, final String topic) {
+        final byte[] bytes = serializer.serialize(topic, sessionKey.key());
         ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
         buf.put(bytes);
         buf.putLong(sessionKey.window().end());

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index ba9873b..ad54fcb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -24,9 +24,11 @@ import static org.apache.kafka.common.utils.Utils.toPositive;
 
 public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
 
+    private final String topic;
     private final WindowedSerializer<K> serializer;
 
-    public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
+    WindowedStreamPartitioner(final String topic, final WindowedSerializer<K> serializer) {
+        this.topic = topic;
         this.serializer = serializer;
     }
 
@@ -40,8 +42,8 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
      * @param numPartitions the total number of partitions
      * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
      */
-    public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
-        byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
+    public Integer partition(final Windowed<K> windowedKey, final V value, final int numPartitions) {
+        final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);
 
         // hash the keyBytes to choose a partition
         return toPositive(Utils.murmur2(keyBytes)) % numPartitions;

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index b19510c..059be37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -32,18 +32,21 @@ public final class StateSerdes<K, V> {
     /**
      * Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes.
      *
-     * @param stateName   the name of the state
-     * @param keyClass    the class of the key type
-     * @param valueClass  the class of the value type
-     * @param <K>         the key type
-     * @param <V>         the value type
-     * @return            a new instance of {@link StateSerdes}
+     * @param topic      the topic name
+     * @param keyClass   the class of the key type
+     * @param valueClass the class of the value type
+     * @param <K>        the key type
+     * @param <V>        the value type
+     * @return           a new instance of {@link StateSerdes}
      */
-    public static <K, V> StateSerdes<K, V> withBuiltinTypes(String stateName, Class<K> keyClass, Class<V> valueClass) {
-        return new StateSerdes<>(stateName, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
+    public static <K, V> StateSerdes<K, V> withBuiltinTypes(
+        final String topic,
+        final Class<K> keyClass,
+        final Class<V> valueClass) {
+        return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
     }
 
-    private final String stateName;
+    private final String topic;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
 
@@ -53,22 +56,26 @@ public final class StateSerdes<K, V> {
      * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not
      * need to provide the topic name any more.
      *
-     * @param stateName     the name of the state
-     * @param keySerde      the serde for keys; cannot be null
-     * @param valueSerde    the serde for values; cannot be null
+     * @param topic      the topic name
+     * @param keySerde   the serde for keys; cannot be null
+     * @param valueSerde the serde for values; cannot be null
      * @throws IllegalArgumentException if key or value serde is null
      */
     @SuppressWarnings("unchecked")
-    public StateSerdes(String stateName,
-                       Serde<K> keySerde,
-                       Serde<V> valueSerde) {
-        this.stateName = stateName;
-
-        if (keySerde == null)
+    public StateSerdes(final String topic,
+                       final Serde<K> keySerde,
+                       final Serde<V> valueSerde) {
+        if (topic == null) {
+            throw new IllegalArgumentException("topic cannot be null");
+        }
+        if (keySerde == null) {
             throw new IllegalArgumentException("key serde cannot be null");
-        if (valueSerde == null)
+        }
+        if (valueSerde == null) {
             throw new IllegalArgumentException("value serde cannot be null");
+        }
 
+        this.topic = topic;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
     }
@@ -128,12 +135,12 @@ public final class StateSerdes<K, V> {
     }
 
     /**
-     * Return the name of the state.
+     * Return the topic.
      *
-     * @return the name of the state
+     * @return the topic
      */
-    public String stateName() {
-        return stateName;
+    public String topic() {
+        return topic;
     }
 
     /**
@@ -143,7 +150,7 @@ public final class StateSerdes<K, V> {
      * @return        the key as typed object
      */
     public K keyFrom(byte[] rawKey) {
-        return keySerde.deserializer().deserialize(stateName, rawKey);
+        return keySerde.deserializer().deserialize(topic, rawKey);
     }
 
     /**
@@ -153,7 +160,7 @@ public final class StateSerdes<K, V> {
      * @return          the value as typed object
      */
     public V valueFrom(byte[] rawValue) {
-        return valueSerde.deserializer().deserialize(stateName, rawValue);
+        return valueSerde.deserializer().deserialize(topic, rawValue);
     }
 
     /**
@@ -163,7 +170,7 @@ public final class StateSerdes<K, V> {
      * @return     the serialized key
      */
     public byte[] rawKey(K key) {
-        return keySerde.serializer().serialize(stateName, key);
+        return keySerde.serializer().serialize(topic, key);
     }
 
     /**
@@ -173,6 +180,6 @@ public final class StateSerdes<K, V> {
      * @return       the serialized value
      */
     public byte[] rawValue(V value) {
-        return valueSerde.serializer().serialize(stateName, value);
+        return valueSerde.serializer().serialize(topic, value);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 1e91b47..f9f3077 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -69,7 +70,7 @@ class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K,
     @SuppressWarnings("unchecked")
     void initInternal(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
-        this.serdes = new StateSerdes<>(underlying.name(),
+        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()),
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 6cfbf81..ed64246 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -43,6 +44,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
     private StateSerdes<K, AGG> serdes;
     private ThreadCache cache;
     private CacheFlushListener<Windowed<K>, AGG> flushListener;
+    private String topic;
 
     CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
                         final Serde<K> keySerde,
@@ -58,7 +60,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
                                                            final long earliestSessionEndTime,
                                                            final long latestSessionStartTime) {
         validateStoreOpen();
-        final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(this.name(), key));
+        final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(topic, key));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName,
                                                                                   keySchema.lowerRange(binarySessionId,
                                                                                                        earliestSessionEndTime).get(),
@@ -79,7 +81,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
 
     public void put(final Windowed<K> key, AGG value) {
         validateStoreOpen();
-        final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer());
+        final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer(), topic);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
                                                       key.window().end(), context.partition(), context.topic());
         cache.put(cacheName, binaryKey.get(), entry);
@@ -92,6 +94,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
 
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
+        topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
         bytesStore.init(context, root);
         initInternal((InternalProcessorContext) context);
     }
@@ -100,7 +103,8 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
     private void initInternal(final InternalProcessorContext context) {
         this.context = context;
 
-        this.serdes = new StateSerdes<>(bytesStore.name(),
+        keySchema.init(topic);
+        this.serdes = new StateSerdes<>(topic,
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
 
@@ -123,12 +127,12 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
         final RecordContext current = context.recordContext();
         context.setRecordContext(entry.recordContext());
         try {
-            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer());
+            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic);
             if (flushListener != null) {
                 final AGG newValue = serdes.valueFrom(entry.newValue());
                 final AGG oldValue = fetchPrevious(binaryKey);
                 if (!(newValue == null && oldValue == null)) {
-                    flushListener.apply(key, newValue == null ? null : newValue, oldValue);
+                    flushListener.apply(key, newValue, oldValue);
                 }
             }
             bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), entry.newValue());

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 33df426..37ce336 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -62,12 +63,13 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
     public void init(final ProcessorContext context, final StateStore root) {
         underlying.init(context, root);
         initInternal(context);
+        keySchema.init(context.applicationId());
     }
 
     @SuppressWarnings("unchecked")
     void initInternal(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
-        this.serdes = new StateSerdes<>(underlying.name(),
+        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()),
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
@@ -157,8 +159,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
 
         return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator,
                                                           underlyingIterator,
-                                                          new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde()));
-
+                                                          new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde()));
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index e31d04b..9ece123 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -37,7 +38,13 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractW
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         inner.init(context, root);
-        this.changeLogger = new StoreChangeLogger<>(inner.name(), context, WindowStoreUtils.INNER_SERDES);
+        this.changeLogger = new StoreChangeLogger<>(
+            inner.name(),
+            context,
+            WindowStoreUtils.getInnerStateSerde(
+                ProcessorStateManager.storeChangelogTopic(
+                    context.applicationId(),
+                    inner.name())));
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
index cd63d1a..d50f907 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -54,7 +55,7 @@ class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractWrapped
     public void init(final ProcessorContext context, final StateStore root) {
         innerBytes.init(context, root);
 
-        this.serdes = new StateSerdes<>(innerBytes.name(),
+        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), innerBytes.name()),
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
index 21c2866..2a5a1a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 /**
@@ -66,6 +67,12 @@ class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractWrapped
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
         bytesStore.init(context, root);
-        changeLogger = new StoreChangeLogger<>(name(), context, WindowStoreUtils.INNER_SERDES);
+        changeLogger = new StoreChangeLogger<>(
+            name(),
+            context,
+            WindowStoreUtils.getInnerStateSerde(
+                ProcessorStateManager.storeChangelogTopic(
+                    context.applicationId(),
+                    name())));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index d81f6fb..f402390 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -35,7 +36,6 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     private final String storeName;
 
     private StoreChangeLogger<K, V> changeLogger;
-    private ProcessorContext context;
 
     public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) {
         this.storeName = storeName;
@@ -52,13 +52,13 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     @Override
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
-        this.context = context;
         inner.init(context, root);
 
         // construct the serde
-        StateSerdes<K, V>  serdes = new StateSerdes<>(storeName,
-                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        StateSerdes<K, V>  serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index e00f8ab..05130ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -94,9 +95,10 @@ public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K
         @SuppressWarnings("unchecked")
         public void init(ProcessorContext context, StateStore root) {
             // construct the serde
-            this.serdes = new StateSerdes<>(name,
-                    keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                    valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+            this.serdes = new StateSerdes<>(
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
+                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
             // register the store
             context.register(root, true, new StateRestoreCallback() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 4e1f40e..4ab6fab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -103,9 +104,10 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
         // construct the serde
-        this.serdes = new StateSerdes<>(name,
-                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        this.serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         // register the store
         context.register(root, true, new StateRestoreCallback() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
index db64621..8ecc654 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
@@ -38,7 +38,7 @@ class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSorted
     MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
                                           final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator,
                                           final StateSerdes<K, AGG> serdes) {
-        super(cacheIterator, storeIterator, new StateSerdes<>(serdes.stateName(),
+        super(cacheIterator, storeIterator, new StateSerdes<>(serdes.topic(),
                                                               new SessionKeySerde<>(serdes.keySerde()),
                                                               serdes.valueSerde()));
 
@@ -53,7 +53,7 @@ class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSorted
 
     @Override
     Windowed<K> deserializeCacheKey(final Bytes cacheKey) {
-        return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer());
+        return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer(), rawSerdes.topic());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 31956ba..ef6ea3c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.util.List;
@@ -95,6 +96,8 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     public void init(ProcessorContext context, StateStore root) {
         this.context = context;
 
+        keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()));
+
         segments.openExisting(context);
 
         // register and possibly restore the state from the logs

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index a8ddc73..22f4a9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -34,6 +35,7 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> {
     private final Serde<AGG> aggSerde;
     private final SegmentedBytesStore bytesStore;
     private StateSerdes<K, AGG> serdes;
+    protected String topic;
 
 
     RocksDBSessionStore(final SegmentedBytesStore bytesStore,
@@ -55,12 +57,12 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> {
 
     @Override
     public void remove(final Windowed<K> key) {
-        bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer()));
+        bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer(), topic));
     }
 
     @Override
     public void put(final Windowed<K> sessionKey, final AGG aggregate) {
-        bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer()), aggSerde.serializer().serialize(bytesStore.name(), aggregate));
+        bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), aggSerde.serializer().serialize(bytesStore.name(), aggregate));
     }
 
     @Override
@@ -71,7 +73,9 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> {
     @Override
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
-        this.serdes = new StateSerdes<>(bytesStore.name(),
+        final String storeName = bytesStore.name();
+        topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+        this.serdes = new StateSerdes<>(topic,
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
 
@@ -121,7 +125,7 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> {
         @Override
         public Windowed<K> peekNextKey() {
             final Bytes bytes = bytesIterator.peekNextKey();
-            return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer());
+            return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer(), serdes.topic());
         }
 
         @Override
@@ -132,7 +136,7 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> {
         @Override
         public KeyValue<Windowed<K>, AGG> next() {
             final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-            return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer()), serdes.valueFrom(next.value));
+            return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 6bb14dd..acfb5b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -141,9 +142,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
         // we need to construct the serde while opening DB since
         // it is also triggered by windowed DB segments without initialization
-        this.serdes = new StateSerdes<>(name,
-                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        this.serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
         this.db = openDB(this.dbDir, this.options, TTL_SECONDS);

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 80c4796..ffd3061 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -69,7 +70,7 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
         // construct the serde
-        this.serdes = new StateSerdes<>(bytesStore.name(),
+        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()),
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index ab1099e..b2957da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -67,6 +67,14 @@ public interface SegmentedBytesStore extends StateStore {
 
 
     interface KeySchema {
+
+        /**
+         * Initialized the schema with a topic.
+         *
+         * @param topic a topic name
+         */
+        void init(final String topic);
+
         /**
          * Given a record-key and a time, construct a Segmented key that represents
          * the upper range of keys to search when performing range queries.

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 3747d0f..698970a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -27,17 +27,23 @@ import java.util.List;
 
 
 class SessionKeySchema implements SegmentedBytesStore.KeySchema {
+    private String topic;
+
+    @Override
+    public void init(final String topic) {
+        this.topic = topic;
+    }
 
     @Override
     public Bytes upperRange(final Bytes key, final long to) {
         final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE));
-        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer());
+        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic);
     }
 
     @Override
     public Bytes lowerRange(final Bytes key, final long from) {
         final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from)));
-        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer());
+        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
index a4d347c..9d32592 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
@@ -24,7 +24,12 @@ import org.apache.kafka.streams.state.StateSerdes;
 import java.util.List;
 
 class WindowStoreKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
-    private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray());
+    private StateSerdes<Bytes, byte[]> serdes;
+
+    @Override
+    public void init(final String topic) {
+        serdes = new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray());
+    }
 
     @Override
     public Bytes upperRange(final Bytes key, final long to) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
index 074cf8a..0f491ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -34,9 +34,12 @@ public class WindowStoreUtils {
     /** Inner byte array serde used for segments */
     public static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
     public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
-    public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE);
 
 
+    static StateSerdes<Bytes, byte[]> getInnerStateSerde(final String topic) {
+        return new StateSerdes<>(topic, INNER_KEY_SERDE, INNER_VALUE_SERDE);
+    }
+
     public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) {
         byte[] serializedKey = serdes.rawKey(key);
         return toBinaryKey(serializedKey, timestamp, seqnum);

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
index 3ccdc2c..5c70c8a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
@@ -58,29 +58,29 @@ public class SessionKeySerdeTest {
     @Test
     public void shouldConvertToBinaryAndBack() throws Exception {
         final Windowed<String> key = new Windowed<>("key", new SessionWindow(10, 20));
-        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
-        final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer());
+        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer(), "topic");
+        final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer(), "topic");
         assertEquals(key, result);
     }
 
     @Test
     public void shouldExtractEndTimeFromBinary() throws Exception {
         final Windowed<String> key = new Windowed<>("key", new SessionWindow(10, 100));
-        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
+        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer(), "topic");
         assertEquals(100, SessionKeySerde.extractEnd(serialized.get()));
     }
 
     @Test
     public void shouldExtractStartTimeFromBinary() throws Exception {
         final Windowed<String> key = new Windowed<>("key", new SessionWindow(50, 100));
-        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
+        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer(), "topic");
         assertEquals(50, SessionKeySerde.extractStart(serialized.get()));
     }
 
     @Test
     public void shouldExtractKeyBytesFromBinary() throws Exception {
         final Windowed<String> key = new Windowed<>("blah", new SessionWindow(50, 100));
-        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
+        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer(), "topic");
         assertArrayEquals("blah".getBytes(), SessionKeySerde.extractKeyBytes(serialized.get()));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 0b6288f..efb3fd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -70,7 +70,7 @@ public class WindowedStreamPartitionerTest {
         DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
 
         WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer);
-        WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
+        WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, windowedSerializer);
 
         for (int k = 0; k < 10; k++) {
             Integer key = rand.nextInt();

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 2f5b368..1f2665a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -170,10 +170,10 @@ public class KeyValueStoreTestDriver<K, V> {
      * @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null
      * @return the test driver; never null
      */
-    public static <K, V> KeyValueStoreTestDriver<K, V> create(Serializer<K> keySerializer,
-                                                              Deserializer<K> keyDeserializer,
-                                                              Serializer<V> valueSerializer,
-                                                              Deserializer<V> valueDeserializer) {
+    public static <K, V> KeyValueStoreTestDriver<K, V> create(final Serializer<K> keySerializer,
+                                                              final Deserializer<K> keyDeserializer,
+                                                              final Serializer<V> valueSerializer,
+                                                              final Deserializer<V> valueDeserializer) {
         StateSerdes<K, V> serdes = new StateSerdes<K, V>("unexpected",
             Serdes.serdeFrom(keySerializer, keyDeserializer),
             Serdes.serdeFrom(valueSerializer, valueDeserializer));

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 0475031..e12d693 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -117,7 +117,7 @@ public class CachingSessionStoreTest {
         assertEquals(added.size() - 1, cache.size());
         final KeyValueIterator<Bytes, byte[]> iterator = underlying.fetch(Bytes.wrap(added.get(0).key.key().getBytes()), 0, 0);
         final KeyValue<Bytes, byte[]> next = iterator.next();
-        assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer()));
+        assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "topic"));
         assertArrayEquals(serdes.rawValue(added.get(0).value), next.value);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 11e605c..d50b551 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -162,7 +162,7 @@ public class CachingWindowStoreTest {
     @Test
     public void shouldIterateCacheAndStore() throws Exception {
         final Bytes key = Bytes.wrap("1" .getBytes());
-        underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.INNER_SERDES)), "a".getBytes());
+        underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("topic"))), "a".getBytes());
         cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE);
         final WindowStoreIterator<String> fetch = cachingStore.fetch("1", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE);
         assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 0fd6001..579f129 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -59,7 +59,7 @@ public class CompositeReadOnlyKeyValueStoreTest {
     }
 
     private KeyValueStore<String, String> newStoreInstance() {
-        return StateStoreTestUtils.newKeyValueStore(storeName, String.class, String.class);
+        return StateStoreTestUtils.newKeyValueStore(storeName, "app-id", String.class, String.class);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
index e7c2eb3..3cc217d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
@@ -46,7 +46,7 @@ public class MergedSortedCacheSessionStoreIteratorTest {
     private final SessionWindow cacheWindow = new SessionWindow(10, 20);
     private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(KeyValue.pair(
             SessionKeySerde.toBinary(
-                    new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer()), new LRUCacheEntry(cacheKey.getBytes())))
+                    new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "topic"), new LRUCacheEntry(cacheKey.getBytes())))
             .iterator();
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 3763290..e9c8f89 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -155,7 +155,7 @@ public class RocksDBSegmentedBytesStoreTest {
     }
 
     private Bytes serializeKey(final Windowed<String> key) {
-        return SessionKeySerde.toBinary(key, Serdes.String().serializer());
+        return SessionKeySerde.toBinary(key, Serdes.String().serializer(), "topic");
     }
 
     private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) {
@@ -163,7 +163,7 @@ public class RocksDBSegmentedBytesStoreTest {
         while (iterator.hasNext()) {
             final KeyValue<Bytes, byte[]> next = iterator.next();
             final KeyValue<Windowed<String>, Long> deserialized
-                    = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer()), Serdes.Long().deserializer().deserialize("", next.value));
+                    = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "topic"), Serdes.Long().deserializer().deserialize("", next.value));
             results.add(deserialized);
         }
         return results;

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
index 5fc9e1f..bb95827 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
@@ -28,7 +29,7 @@ import java.util.Collections;
 @SuppressWarnings("unchecked")
 public class StateStoreTestUtils {
 
-    public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, Class<K> keyType, Class<V> valueType) {
+    public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, String applicationId, Class<K> keyType, Class<V> valueType) {
         final InMemoryKeyValueStoreSupplier<K, V> supplier = new InMemoryKeyValueStoreSupplier<>(name,
                                                                                                  null,
                                                                                                  null,
@@ -37,8 +38,14 @@ public class StateStoreTestUtils {
                                                                                                  Collections.<String, String>emptyMap());
 
         final StateStore stateStore = supplier.get();
-        stateStore.init(new MockProcessorContext(StateSerdes.withBuiltinTypes(name, keyType, valueType),
-                new NoOpRecordCollector()), stateStore);
+        stateStore.init(
+            new MockProcessorContext(
+                StateSerdes.withBuiltinTypes(
+                    ProcessorStateManager.storeChangelogTopic(applicationId, name),
+                    keyType,
+                    valueType),
+                new NoOpRecordCollector()),
+            stateStore);
         return (KeyValueStore<K, V>) stateStore;
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index 708e153..85270ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -42,9 +42,9 @@ public class WrappingStoreProviderTest {
         final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false);
 
 
-        stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class));
+        stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class));
         stubProviderOne.addStore("window", new NoOpWindowStore());
-        stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class));
+        stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class));
         stubProviderTwo.addStore("window", new NoOpWindowStore());
 
         wrappingStoreProvider = new WrappingStoreProvider(


Mime
View raw message