kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6813: Remove deprecated APIs in KIP-182, Part II (#4976)
Date Thu, 10 May 2018 00:13:09 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0b1a118  KAFKA-6813: Remove deprecated APIs in KIP-182, Part II (#4976)
0b1a118 is described below

commit 0b1a118f45418aba6af03e71e7169e38cb3ec9af
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Wed May 9 17:13:05 2018 -0700

    KAFKA-6813: Remove deprecated APIs in KIP-182, Part II (#4976)
    
    1. Remove the deprecated StateStoreSuppliers, and the corresponding Stores.create() functions and factories: only the base StateStoreSupplier and MockStoreSupplier were still preserved as they are needed by the deprecated TopologyBuilder and KStreamBuilder. Will remove them in a follow-up PR.
    
    2. Add TopologyWrapper.java as the original InternalTopologyBuilderAccessor was removed, but I realized it is still needed as of now.
    
    3. Minor: removed StateStoreTestUtils.java and inline its logic in its callers since now with StoreBuilder it is just a one-liner.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../streams/kstream/internals/AbstractStream.java  |  39 --
 .../org/apache/kafka/streams/state/Stores.java     | 414 ---------------------
 .../internals/InMemoryKeyValueStoreSupplier.java   |  53 ---
 .../internals/InMemoryLRUCacheStoreSupplier.java   |  51 ---
 .../streams/state/internals/MemoryLRUCache.java    |   3 -
 .../internals/RocksDBKeyValueStoreSupplier.java    |   1 -
 .../internals/RocksDBSessionStoreSupplier.java     |  65 ----
 .../streams/state/internals/RocksDBStore.java      |   2 -
 .../internals/RocksDBWindowStoreSupplier.java      |  75 ----
 .../RocksDbSessionBytesStoreSupplier.java          |   8 +-
 .../internals/RocksDbWindowBytesStoreSupplier.java |   7 +-
 .../kafka/streams/state/internals/ThreadCache.java |   2 -
 .../org/apache/kafka/streams/TopologyWrapper.java  |  34 ++
 .../integration/RegexSourceIntegrationTest.java    |  23 +-
 .../kafka/streams/kstream/KStreamBuilderTest.java  |   2 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |  22 +-
 .../streams/processor/TopologyBuilderTest.java     |  28 +-
 .../internals/InternalTopologyBuilderTest.java     | 102 +++--
 .../processor/internals/ProcessorTopologyTest.java | 169 +++++----
 .../processor/internals/StandbyTaskTest.java       |   1 -
 .../internals/StreamsPartitionAssignorTest.java    |  13 +-
 .../org/apache/kafka/streams/state/StoresTest.java |  69 +---
 .../CompositeReadOnlyKeyValueStoreTest.java        |  17 +-
 .../RocksDBKeyValueStoreSupplierTest.java          | 162 --------
 .../internals/RocksDBSessionStoreSupplierTest.java | 139 -------
 .../internals/RocksDBWindowStoreSupplierTest.java  | 175 ---------
 .../state/internals/StateStoreTestUtils.java       |  56 ---
 .../StreamThreadStateStoreProviderTest.java        |  29 +-
 .../state/internals/WrappingStoreProviderTest.java |  12 +-
 29 files changed, 249 insertions(+), 1524 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 3c65399..497bdac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.internals.Topic;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueTransformer;
@@ -26,12 +24,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.HashSet;
 import java.util.Objects;
@@ -81,38 +74,6 @@ public abstract class AbstractStream<K> {
         };
     }
 
-    @SuppressWarnings({"unchecked", "deprecation"})
-    static <T, K>  org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
-                                                                   final Serde<T> aggValueSerde,
-                                                                   final String storeName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        Topic.validate(storeName);
-        return storeFactory(keySerde, aggValueSerde, storeName).build();
-    }
-
-    @SuppressWarnings({"unchecked", "deprecation"})
-    static  <W extends Window, T, K> org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
-                                                                                   final Serde<T> aggValSerde,
-                                                                                   final Windows<W> windows,
-                                                                                   final String storeName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        Topic.validate(storeName);
-        return storeFactory(keySerde, aggValSerde, storeName)
-                .windowed(windows.size(), windows.maintainMs(), windows.segments, false)
-                .build();
-    }
-
-    @SuppressWarnings("deprecation")
-    static  <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde,
-                                                                       final Serde<T> aggValueSerde,
-                                                                       final String storeName) {
-        return Stores.create(storeName)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .enableCaching();
-    }
-
     static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR> valueMapper) {
         Objects.requireNonNull(valueMapper, "valueMapper can't be null");
         return new ValueMapperWithKey<K, V, VR>() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index daa2915..27b985b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -22,13 +22,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
-import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
-import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
-import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
@@ -37,9 +32,6 @@ import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -244,411 +236,5 @@ public class Stores {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
     }
-
-    /**
-     * Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
-     *
-     * @param name the name of the store
-     * @return the factory that can be used to specify other options or configurations for the store; never null
-     * @deprecated use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)}
-     * {@link #persistentSessionStore(String, long)}, {@link #lruMap(String, int)}, or {@link #inMemoryKeyValueStore(String)}
-     */
-    @Deprecated
-    public static StoreFactory create(final String name) {
-        return new StoreFactory() {
-            @Override
-            public <K> ValueFactory<K> withKeys(final Serde<K> keySerde) {
-                return new ValueFactory<K>() {
-                    @Override
-                    public <V> KeyValueFactory<K, V> withValues(final Serde<V> valueSerde) {
-
-                        return new KeyValueFactory<K, V>() {
-
-                            @Override
-                            public InMemoryKeyValueFactory<K, V> inMemory() {
-                                return new InMemoryKeyValueFactory<K, V>() {
-                                    private int capacity = Integer.MAX_VALUE;
-                                    private final Map<String, String> logConfig = new HashMap<>();
-                                    private boolean logged = true;
-
-                                    /**
-                                     * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2
-                                     * @throws IllegalArgumentException if the capacity of the store is zero or negative
-                                     */
-                                    @Override
-                                    public InMemoryKeyValueFactory<K, V> maxEntries(int capacity) {
-                                        if (capacity < 1) throw new IllegalArgumentException("The capacity must be positive");
-                                        this.capacity = capacity;
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public InMemoryKeyValueFactory<K, V> enableLogging(final Map<String, String> config) {
-                                        logged = true;
-                                        logConfig.putAll(config);
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public InMemoryKeyValueFactory<K, V> disableLogging() {
-                                        logged = false;
-                                        logConfig.clear();
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public org.apache.kafka.streams.processor.StateStoreSupplier build() {
-                                        log.trace("Defining InMemory Store name={} capacity={} logged={}", name, capacity, logged);
-                                        if (capacity < Integer.MAX_VALUE) {
-                                            return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig);
-                                        }
-                                        return new InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig);
-                                    }
-                                };
-                            }
-
-                            @Override
-                            public PersistentKeyValueFactory<K, V> persistent() {
-                                return new PersistentKeyValueFactory<K, V>() {
-                                    boolean cachingEnabled;
-                                    private long windowSize;
-                                    private final Map<String, String> logConfig = new HashMap<>();
-                                    private int numSegments = 0;
-                                    private long retentionPeriod = 0L;
-                                    private boolean retainDuplicates = false;
-                                    private boolean sessionWindows;
-                                    private boolean logged = true;
-
-                                    @Override
-                                    public PersistentKeyValueFactory<K, V> windowed(final long windowSize, final long retentionPeriod, final int numSegments, final boolean retainDuplicates) {
-                                        if (numSegments < RocksDBWindowStoreSupplier.MIN_SEGMENTS) {
-                                            throw new IllegalArgumentException("numSegments must be >= " + RocksDBWindowStoreSupplier.MIN_SEGMENTS);
-                                        }
-                                        this.windowSize = windowSize;
-                                        this.numSegments = numSegments;
-                                        this.retentionPeriod = retentionPeriod;
-                                        this.retainDuplicates = retainDuplicates;
-                                        this.sessionWindows = false;
-
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public PersistentKeyValueFactory<K, V> sessionWindowed(final long retentionPeriod) {
-                                        this.sessionWindows = true;
-                                        this.retentionPeriod = retentionPeriod;
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public PersistentKeyValueFactory<K, V> enableLogging(final Map<String, String> config) {
-                                        logged = true;
-                                        logConfig.putAll(config);
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public PersistentKeyValueFactory<K, V> disableLogging() {
-                                        logged = false;
-                                        logConfig.clear();
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public PersistentKeyValueFactory<K, V> enableCaching() {
-                                        cachingEnabled = true;
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public org.apache.kafka.streams.processor.StateStoreSupplier build() {
-                                        log.trace("Defining RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged);
-                                        if (sessionWindows) {
-                                            return new RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, logged, logConfig, cachingEnabled);
-                                        } else if (numSegments > 0) {
-                                            return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig, cachingEnabled);
-                                        }
-                                        return new RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig, cachingEnabled);
-                                    }
-
-                                };
-                            }
-
-
-                        };
-                    }
-                };
-            }
-        };
-    }
-
-
-    public static abstract class StoreFactory {
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link String}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<String> withStringKeys() {
-            return withKeys(Serdes.String());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Integer}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<Integer> withIntegerKeys() {
-            return withKeys(Serdes.Integer());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Long}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<Long> withLongKeys() {
-            return withKeys(Serdes.Long());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Double}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<Double> withDoubleKeys() {
-            return withKeys(Serdes.Double());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link ByteBuffer}.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<ByteBuffer> withByteBufferKeys() {
-            return withKeys(Serdes.ByteBuffer());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be byte arrays.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<byte[]> withByteArrayKeys() {
-            return withKeys(Serdes.ByteArray());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys.
-         *
-         * @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes
-         * @return the interface used to specify the type of values; never null
-         */
-        public <K> ValueFactory<K> withKeys(Class<K> keyClass) {
-            return withKeys(Serdes.serdeFrom(keyClass));
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys.
-         *
-         * @param keySerde  the serialization factory for keys; may be null
-         * @return          the interface used to specify the type of values; never null
-         */
-        public abstract <K> ValueFactory<K> withKeys(Serde<K> keySerde);
-    }
-
-    /**
-     * The factory for creating off-heap key-value stores.
-     *
-     * @param <K> the type of keys
-     */
-    public static abstract class ValueFactory<K> {
-        /**
-         * Use {@link String} values.
-         *
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public KeyValueFactory<K, String> withStringValues() {
-            return withValues(Serdes.String());
-        }
-
-        /**
-         * Use {@link Integer} values.
-         *
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public KeyValueFactory<K, Integer> withIntegerValues() {
-            return withValues(Serdes.Integer());
-        }
-
-        /**
-         * Use {@link Long} values.
-         *
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public KeyValueFactory<K, Long> withLongValues() {
-            return withValues(Serdes.Long());
-        }
-
-        /**
-         * Use {@link Double} values.
-         *
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public KeyValueFactory<K, Double> withDoubleValues() {
-            return withValues(Serdes.Double());
-        }
-
-        /**
-         * Use {@link ByteBuffer} for values.
-         *
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public KeyValueFactory<K, ByteBuffer> withByteBufferValues() {
-            return withValues(Serdes.ByteBuffer());
-        }
-
-        /**
-         * Use byte arrays for values.
-         *
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public KeyValueFactory<K, byte[]> withByteArrayValues() {
-            return withValues(Serdes.ByteArray());
-        }
-
-        /**
-         * Use values of the specified type.
-         *
-         * @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serdes
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public <V> KeyValueFactory<K, V> withValues(Class<V> valueClass) {
-            return withValues(Serdes.serdeFrom(valueClass));
-        }
-
-        /**
-         * Use the specified serializer and deserializer for the values.
-         *
-         * @param valueSerde    the serialization factory for values; may be null
-         * @return              the interface used to specify the remaining key-value store options; never null
-         */
-        public abstract <V> KeyValueFactory<K, V> withValues(Serde<V> valueSerde);
-    }
-
-
-    public interface KeyValueFactory<K, V> {
-        /**
-         * Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be
-         * read to restore the entries if they are lost.
-         *
-         * @return the factory to create in-memory key-value stores; never null
-         */
-        InMemoryKeyValueFactory<K, V> inMemory();
-
-        /**
-         * Keep all key-value entries off-heap in a local database, although for durability all entries are recorded in a Kafka
-         * topic that can be read to restore the entries if they are lost.
-         *
-         * @return the factory to create persistent key-value stores; never null
-         */
-        PersistentKeyValueFactory<K, V> persistent();
-    }
-
-    /**
-     * The interface used to create in-memory key-value stores.
-     *
-     * @param <K> the type of keys
-     * @param <V> the type of values
-     */
-    @Deprecated
-    public interface InMemoryKeyValueFactory<K, V> {
-        /**
-         * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
-         * equivalent to not placing a limit on the number of entries.
-         *
-         * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2
-         * @return this factory
-         * @throws IllegalArgumentException if the capacity is not positive
-         */
-        InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
-
-        /**
-         * Indicates that a changelog should be created for the store. The changelog will be created
-         * with the provided cleanupPolicy and configs.
-         *
-         * Note: Any unrecognized configs will be ignored.
-         * @param config    any configs that should be applied to the changelog
-         * @return  the factory to create an in-memory key-value store
-         */
-        InMemoryKeyValueFactory<K, V> enableLogging(final Map<String, String> config);
-
-        /**
-         * Indicates that a changelog should not be created for the key-value store
-         * @return the factory to create an in-memory key-value store
-         */
-        InMemoryKeyValueFactory<K, V> disableLogging();
-
-
-        /**
-         * Return the instance of StateStoreSupplier of new key-value store.
-         * @return the state store supplier; never null
-         */
-        org.apache.kafka.streams.processor.StateStoreSupplier build();
-    }
-
-    /**
-     * The interface used to create off-heap key-value stores that use a local database.
-     *
-     * @param <K> the type of keys
-     * @param <V> the type of values
-     */
-    @Deprecated
-    public interface PersistentKeyValueFactory<K, V> {
-
-        /**
-         * Set the persistent store as a windowed key-value store
-         * @param windowSize size of the windows
-         * @param retentionPeriod the maximum period of time in milli-second to keep each window in this store
-         * @param numSegments the maximum number of segments for rolling the windowed store
-         * @param retainDuplicates whether or not to retain duplicate data within the window
-         */
-        PersistentKeyValueFactory<K, V> windowed(final long windowSize, long retentionPeriod, int numSegments, boolean retainDuplicates);
-
-        /**
-         * Set the persistent store as a {@link SessionStore} for use with {@link org.apache.kafka.streams.kstream.SessionWindows}
-         * @param retentionPeriod period of time in milliseconds to keep each window in this store
-         */
-        PersistentKeyValueFactory<K, V> sessionWindowed(final long retentionPeriod);
-
-        /**
-         * Indicates that a changelog should be created for the store. The changelog will be created
-         * with the provided cleanupPolicy and configs.
-         *
-         * Note: Any unrecognized configs will be ignored.
-         * @param config            any configs that should be applied to the changelog
-         * @return  the factory to create a persistent key-value store
-         */
-        PersistentKeyValueFactory<K, V> enableLogging(final Map<String, String> config);
-
-        /**
-         * Indicates that a changelog should not be created for the key-value store
-         * @return the factory to create a persistent key-value store
-         */
-        PersistentKeyValueFactory<K, V> disableLogging();
-
-        /**
-         * Caching should be enabled on the created store.
-         * @return the factory to create a persistent key-value store
-         */
-        PersistentKeyValueFactory<K, V> enableCaching();
-
-        /**
-         * Return the instance of StateStoreSupplier of new key-value store.
-         * @return the key-value store; never null
-         */
-        org.apache.kafka.streams.processor.StateStoreSupplier build();
-
-    }
 }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
deleted file mode 100644
index f955421..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Map;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior.
- * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
- * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
- */
-@Deprecated
-public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
-
-    public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
-        this(name, keySerde, valueSerde, null, logged, logConfig);
-    }
-
-    public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) {
-        super(name, keySerde, valueSerde, time, logged, logConfig);
-    }
-
-    public KeyValueStore get() {
-        InMemoryKeyValueStore<K, V> store = new InMemoryKeyValueStore<>(name, keySerde, valueSerde);
-
-        return new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, "in-memory-state", time);
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
deleted file mode 100644
index 0f897ba..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Map;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-@Deprecated
-public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
-
-    private final int capacity;
-
-    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
-        this(name, capacity, keySerde, valueSerde, null, logged, logConfig);
-    }
-
-    private InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) {
-        super(name, keySerde, valueSerde, time, logged, logConfig);
-        this.capacity = capacity;
-    }
-
-    public KeyValueStore get() {
-        MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<>(name, capacity, keySerde, valueSerde);
-        return new MeteredKeyValueStore<>(logged ? cache.enableLogging() : cache, "in-memory-lru-state", time);
-    }
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index b99c907..1957aa4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -37,12 +37,9 @@ import java.util.Objects;
  *  * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior.
  * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
  * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
-
  *
  * @param <K> The key type
  * @param <V> The value type
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 4b233f0..3bc56c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -27,7 +27,6 @@ import java.util.Map;
  *
  * @param <K> the type of keys
  * @param <V> the type of values
- * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 @Deprecated
 public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
deleted file mode 100644
index 1552f7d..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.SessionStore;
-
-import java.util.Map;
-
-/**
- * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
- */
-@Deprecated
-public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> {
-
-    static final int NUM_SEGMENTS = 3;
-    private final long retentionPeriod;
-    private final SessionStoreBuilder<K, V> builder;
-
-    public RocksDBSessionStoreSupplier(String name, long retentionPeriod, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) {
-        super(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig);
-        this.retentionPeriod = retentionPeriod;
-        builder = new SessionStoreBuilder<>(new RocksDbSessionBytesStoreSupplier(name,
-                                                                                 retentionPeriod),
-                                            keySerde,
-                                            valueSerde,
-                                            time);
-        if (cached) {
-            builder.withCachingEnabled();
-        }
-        // logged by default so we only need to worry about when it is disabled.
-        if (!logged) {
-            builder.withLoggingDisabled();
-        }
-    }
-
-    public SessionStore<K, V> get() {
-        return builder.build();
-
-    }
-
-    public long retentionPeriod() {
-        return retentionPeriod;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 2813041..d2b8cd2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -62,8 +62,6 @@ import java.util.Set;
  * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
  * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
  * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
deleted file mode 100644
index 2a82f79..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.WindowStore;
-
-import java.util.Map;
-
-/**
- * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
- */
-@Deprecated
-public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> {
-    public static final int MIN_SEGMENTS = 2;
-    private final long retentionPeriod;
-    private WindowStoreBuilder<K, V> builder;
-
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, long windowSize, boolean logged, Map<String, String> logConfig, boolean enableCaching) {
-        this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, Time.SYSTEM, windowSize, logged, logConfig, enableCaching);
-    }
-
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, Time time, long windowSize, boolean logged, Map<String, String> logConfig, boolean enableCaching) {
-        super(name, keySerde, valueSerde, time, logged, logConfig);
-        if (numSegments < MIN_SEGMENTS) {
-            throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
-        }
-        this.retentionPeriod = retentionPeriod;
-        builder = new WindowStoreBuilder<>(new RocksDbWindowBytesStoreSupplier(name,
-                                                                               retentionPeriod,
-                                                                               numSegments,
-                                                                               windowSize,
-                                                                               retainDuplicates),
-                                           keySerde,
-                                           valueSerde,
-                                           time);
-        if (enableCaching) {
-            builder.withCachingEnabled();
-        }
-        // logged by default so we only need to worry about when it is disabled.
-        if (!logged) {
-            builder.withLoggingDisabled();
-        }
-    }
-
-    public WindowStore<K, V> get() {
-        return builder.build();
-    }
-
-    @Override
-    public long retentionPeriod() {
-        return retentionPeriod;
-    }
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index b9b7181..5a87bc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -25,6 +25,8 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
     private final String name;
     private final long retentionPeriod;
 
+    private static final int NUM_SEGMENTS = 3;
+
     public RocksDbSessionBytesStoreSupplier(final String name,
                                             final long retentionPeriod) {
         this.name = name;
@@ -36,13 +38,12 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
         return name;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public SessionStore<Bytes, byte[]> get() {
         final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(
             name,
             retentionPeriod,
-            org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS,
+            NUM_SEGMENTS,
             new SessionKeySchema());
         return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray());
     }
@@ -52,11 +53,10 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
         return "rocksdb-session";
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public long segmentIntervalMs() {
         return Segments.segmentInterval(
             retentionPeriod,
-            org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS);
+            NUM_SEGMENTS);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index e1521f8..5fbf491 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -28,14 +28,15 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
     private final long windowSize;
     private final boolean retainDuplicates;
 
-    @SuppressWarnings("deprecation")
+    private static final int MIN_SEGMENTS = 2;
+
     public RocksDbWindowBytesStoreSupplier(final String name,
                                            final long retentionPeriod,
                                            final int segments,
                                            final long windowSize,
                                            final boolean retainDuplicates) {
-        if (segments < org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS) {
-            throw new IllegalArgumentException("numSegments must be >= " + org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS);
+        if (segments < MIN_SEGMENTS) {
+            throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
         }
         this.name = name;
         this.retentionPeriod = retentionPeriod;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index b947664..8c3716b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -33,8 +33,6 @@ import java.util.NoSuchElementException;
 /**
  * An in-memory LRU cache store similar to {@link MemoryLRUCache} but byte-based, not
  * record based
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 public class ThreadCache {
     private final Logger log;
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
new file mode 100644
index 0000000..f106766
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+/**
+ *  This class allows to access the {@link InternalTopologyBuilder} a {@link Topology} object.
+ *
+ */
+public class TopologyWrapper extends Topology {
+
+    public InternalTopologyBuilder getInternalBuilder() {
+        return internalTopologyBuilder;
+    }
+
+    public void setApplicationId(String applicationId) {
+        internalTopologyBuilder.setApplicationId(applicationId);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index d0361dc..e5160e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -29,17 +29,17 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -136,8 +136,6 @@ public class RegexSourceIntegrationTest {
         final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
         final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-        final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
-
         CLUSTER.createTopic("TEST-TOPIC-1");
 
         final StreamsBuilder builder = new StreamsBuilder();
@@ -227,28 +225,27 @@ public class RegexSourceIntegrationTest {
         }, STREAM_TASKS_NOT_UPDATED);
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
 
         final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
-        final MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false);
+        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("testStateStore"), Serdes.String(), Serdes.String());
         final long thirtySecondTimeout = 30 * 1000;
 
-        final TopologyBuilder builder = new TopologyBuilder()
-                .addSource("ingest", Pattern.compile("topic-\\d+"))
-                .addProcessor("my-processor", processorSupplier, "ingest")
-                .addStateStore(stateStoreSupplier, "my-processor");
+        final TopologyWrapper topology = new TopologyWrapper();
+        topology.addSource("ingest", Pattern.compile("topic-\\d+"));
+        topology.addProcessor("my-processor", processorSupplier, "ingest");
+        topology.addStateStore(storeBuilder, "my-processor");
 
+        streams = new KafkaStreams(topology, streamsConfiguration);
 
-        streams = new KafkaStreams(builder, streamsConfiguration);
         try {
             streams.start();
 
             final TestCondition stateStoreNameBoundToSourceTopic = new TestCondition() {
                 @Override
                 public boolean conditionMet() {
-                    final Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics();
+                    final Map<String, List<String>> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics();
                     final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore");
                     return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1");
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 255c3eb..27f0833 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -18,9 +18,9 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriverWrapper;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 8cb2eae..afc9be1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -31,7 +31,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -43,7 +44,6 @@ import org.junit.Test;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -106,16 +106,16 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     private void initStore(final boolean enableCaching) {
-        final RocksDBSessionStoreSupplier<String, Long> supplier =
-            new RocksDBSessionStoreSupplier<>(
-                STORE_NAME,
-                GAP_MS * 3,
+        final StoreBuilder<SessionStore<String, Long>> storeBuilder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, GAP_MS * 3),
                 Serdes.String(),
-                Serdes.Long(),
-                false,
-                Collections.<String, String>emptyMap(),
-                enableCaching);
-        sessionStore = supplier.get();
+                Serdes.Long())
+                .withLoggingDisabled();
+
+        if (enableCaching) {
+            storeBuilder.withCachingEnabled();
+        }
+
+        sessionStore = storeBuilder.build();
         sessionStore.init(context, sessionStore);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index d1d25e9..93b233b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -31,8 +31,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -547,23 +545,6 @@ public class TopologyBuilderTest {
         assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
     }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldAddInternalTopicConfigForWindowStores() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId("appId");
-        builder.addSource("source", "topic");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), false), "processor");
-        final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
-        final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
-        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
-        final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
-        assertEquals(2, properties.size());
-        assertEquals("40000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
-        assertEquals("appId-store-changelog", topicConfig.name());
-    }
-
     @Test
     public void shouldAddInternalTopicConfigForNonWindowStores() {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -594,7 +575,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
+    public void shouldThrowOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
         final String badNodeName = "badGuy";
@@ -603,12 +584,11 @@ public class TopologyBuilderTest {
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        final StreamsConfig streamsConfig = new StreamsConfig(config);
 
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource(sourceNodeName, "topic")
                 .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
-                .addStateStore(Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), goodNodeName)
+                .addStateStore(new MockStateStoreSupplier(LocalMockProcessorSupplier.STORE_NAME, false), goodNodeName)
                 .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
         try {
             final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, config);
@@ -724,6 +704,7 @@ public class TopologyBuilderTest {
         assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
 
@@ -755,10 +736,11 @@ public class TopologyBuilderTest {
         assertFalse(topics.contains("topic-A"));
     }
 
+    @SuppressWarnings("unchecked")
     @Test(expected = TopologyBuilderException.class)
     public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
         final String sameNameForSourceAndProcessor = "sameName";
-        final TopologyBuilder topologyBuilder = new TopologyBuilder()
+        new TopologyBuilder()
             .addGlobalStore(new MockStateStoreSupplier("anyName", false, false),
                 sameNameForSourceAndProcessor,
                 null,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 149a158..c73593e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -29,11 +29,10 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -63,8 +62,9 @@ import static org.junit.Assert.fail;
 
 public class InternalTopologyBuilderTest {
 
-    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
     private final Serde<String> stringSerde = Serdes.String();
+    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+    private final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.ByteArray(), Serdes.ByteArray());
 
     @Test
     public void shouldAddSourceWithOffsetReset() {
@@ -266,14 +266,14 @@ public class InternalTopologyBuilderTest {
 
     @Test(expected = TopologyException.class)
     public void testAddStateStoreWithNonExistingProcessor() {
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
+        builder.addStateStore(storeBuilder, "no-such-processsor");
     }
 
     @Test
     public void testAddStateStoreWithSource() {
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         try {
-            builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
+            builder.addStateStore(storeBuilder, "source-1");
             fail("Should throw TopologyException with store cannot be added to source");
         } catch (final TopologyException expected) { /* ok */ }
     }
@@ -282,36 +282,34 @@ public class InternalTopologyBuilderTest {
     public void testAddStateStoreWithSink() {
         builder.addSink("sink-1", "topic-1", null, null, null);
         try {
-            builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
+            builder.addStateStore(storeBuilder, "sink-1");
             fail("Should throw TopologyException with store cannot be added to sink");
         } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
     public void testAddStateStoreWithDuplicates() {
-        builder.addStateStore(new MockStateStoreSupplier("store", false));
+        builder.addStateStore(storeBuilder);
         try {
-            builder.addStateStore(new MockStateStoreSupplier("store", false));
+            builder.addStateStore(storeBuilder);
             fail("Should throw TopologyException with store name conflict");
         } catch (final TopologyException expected) { /* ok */ }
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void testAddStateStore() {
-        final StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
-        builder.addStateStore(supplier);
+        builder.addStateStore(storeBuilder);
         builder.setApplicationId("X");
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
 
         assertEquals(0, builder.build(null).stateStores().size());
 
-        builder.connectProcessorAndStateStores("processor-1", "store-1");
+        builder.connectProcessorAndStateStores("processor-1", storeBuilder.name());
 
         final List<StateStore> suppliers = builder.build(null).stateStores();
         assertEquals(1, suppliers.size());
-        assertEquals(supplier.name(), suppliers.get(0).name());
+        assertEquals(storeBuilder.name(), suppliers.get(0).name());
     }
 
     @Test
@@ -346,7 +344,6 @@ public class InternalTopologyBuilderTest {
         assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void testTopicGroupsByStateStore() {
         builder.setApplicationId("X");
@@ -358,15 +355,14 @@ public class InternalTopologyBuilderTest {
 
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
         builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
-        builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2");
+        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-1", "processor-2");
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
         builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
-        builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4");
+        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-3", "processor-4");
 
         builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
-        final StateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false);
-        builder.addStateStore(supplier);
+        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-3"), Serdes.ByteArray(), Serdes.ByteArray()));
         builder.connectProcessorAndStateStores("processor-5", "store-3");
 
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
@@ -415,17 +411,17 @@ public class InternalTopologyBuilderTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingSink() throws Exception {
+    public void shouldNotAllowNullNameWhenAddingSink() {
         builder.addSink(null, "topic", null, null, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTopicWhenAddingSink() throws Exception {
+    public void shouldNotAllowNullTopicWhenAddingSink() {
         builder.addSink("name", null, null, null, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception {
+    public void shouldNotAllowNullNameWhenAddingProcessor() {
         builder.addProcessor(null, new ProcessorSupplier() {
             @Override
             public Processor get() {
@@ -435,39 +431,38 @@ public class InternalTopologyBuilderTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullProcessorSupplier() throws Exception {
+    public void shouldNotAllowNullProcessorSupplier() {
         builder.addProcessor("name", null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingSource() throws Exception {
+    public void shouldNotAllowNullNameWhenAddingSource() {
         builder.addSource(null, null, null, null, null, Pattern.compile(".*"));
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws Exception {
+    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
         builder.connectProcessorAndStateStores(null, "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() throws Exception {
+    public void shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() {
         builder.connectProcessorAndStateStores("processor", new String[]{null});
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAddNullInternalTopic() throws Exception {
+    public void shouldNotAddNullInternalTopic() {
         builder.addInternalTopic(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotSetApplicationIdToNull() throws Exception {
+    public void shouldNotSetApplicationIdToNull() {
         builder.setApplicationId(null);
     }
 
-    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotAddNullStateStoreSupplier() throws Exception {
-        builder.addStateStore((StateStoreSupplier) null);
+    public void shouldNotAddNullStateStoreSupplier() {
+        builder.addStateStore((StoreBuilder) null);
     }
 
     private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {
@@ -479,44 +474,43 @@ public class InternalTopologyBuilderTest {
     }
 
     @Test
-    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception {
+    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
+        builder.addStateStore(storeBuilder, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
     }
 
     @Test
-    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception {
+    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
+        builder.addStateStore(storeBuilder, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
     }
 
     @Test
-    public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception {
+    public void shouldCorrectlyMapStateStoreToInternalTopics() {
         builder.setApplicationId("appId");
         builder.addInternalTopic("internal-topic");
         builder.addSource(null, "source", null, null, null, "internal-topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
+        builder.addStateStore(storeBuilder, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigForWindowStores() throws Exception {
+    public void shouldAddInternalTopicConfigForWindowStores() {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), false), "processor");
+        builder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 30000, 3, 10000, false), Serdes.String(), Serdes.String()), "processor");
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
@@ -528,13 +522,12 @@ public class InternalTopologyBuilderTest {
         assertTrue(topicConfig instanceof WindowedChangelogTopicConfig);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigForNonWindowStores() throws Exception {
+    public void shouldAddInternalTopicConfigForNonWindowStores() {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", true), "processor");
+        builder.addStateStore(storeBuilder, "processor");
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
@@ -545,9 +538,8 @@ public class InternalTopologyBuilderTest {
         assertTrue(topicConfig instanceof UnwindowedChangelogTopicConfig);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigForRepartitionTopics() throws Exception {
+    public void shouldAddInternalTopicConfigForRepartitionTopics() {
         builder.setApplicationId("appId");
         builder.addInternalTopic("foo");
         builder.addSource(null, "source", null, null, null, "foo");
@@ -561,7 +553,6 @@ public class InternalTopologyBuilderTest {
         assertTrue(topicConfig instanceof RepartitionTopicConfig);
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldThrowOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
@@ -572,12 +563,11 @@ public class InternalTopologyBuilderTest {
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        final StreamsConfig streamsConfig = new StreamsConfig(config);
 
         builder.addSource(null, sourceNodeName, null, null, null, "topic");
         builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
         builder.addStateStore(
-            Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(LocalMockProcessorSupplier.STORE_NAME), Serdes.String(), Serdes.String()),
             goodNodeName);
         builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
         
@@ -641,17 +631,15 @@ public class InternalTopologyBuilderTest {
 
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddTimestampExtractorPerSource() throws Exception {
+    public void shouldAddTimestampExtractorPerSource() {
         builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic");
         final ProcessorTopology processorTopology = builder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception {
+    public void shouldAddTimestampExtractorWithPatternPerSource() {
         final Pattern pattern = Pattern.compile("t.*");
         builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern);
         final ProcessorTopology processorTopology = builder.build(null);
@@ -659,7 +647,7 @@ public class InternalTopologyBuilderTest {
     }
 
     @Test
-    public void shouldSortProcessorNodesCorrectly() throws Exception {
+    public void shouldSortProcessorNodesCorrectly() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
@@ -702,11 +690,12 @@ public class InternalTopologyBuilderTest {
         assertEquals(1, node.size);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
         builder.addSource(null, "ingest", null, null, null, Pattern.compile("topic-\\d+"));
         builder.addProcessor("my-processor", new MockProcessorSupplier(), "ingest");
-        builder.addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
+        builder.addStateStore(storeBuilder, "my-processor");
 
         final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates();
         final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
@@ -722,7 +711,7 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId("test-app");
 
         final Map<String, List<String>> stateStoreAndTopics = builder.stateStoreNameToSourceTopics();
-        final List<String> topics = stateStoreAndTopics.get("testStateStore");
+        final List<String> topics = stateStoreAndTopics.get(storeBuilder.name());
 
         assertTrue("Expected to contain two topics", topics.size() == 2);
 
@@ -731,11 +720,12 @@ public class InternalTopologyBuilderTest {
         assertFalse(topics.contains("topic-A"));
     }
 
+    @SuppressWarnings("unchecked")
     @Test(expected = TopologyException.class)
     public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
         final String sameNameForSourceAndProcessor = "sameName";
         builder.addGlobalStore(
-            new MockStateStoreSupplier("anyName", false, false),
+            (StoreBuilder<KeyValueStore>) storeBuilder,
             sameNameForSourceAndProcessor,
             null,
             null,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 51d4e05..9f2b242 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -25,15 +25,14 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -64,7 +63,7 @@ public class ProcessorTopologyTest {
     private static final String OUTPUT_TOPIC_2 = "output-topic-2";
     private static final String THROUGH_TOPIC_1 = "through-topic-1";
 
-    private final TopologyBuilder builder = new TopologyBuilder();
+    private final TopologyWrapper topology = new TopologyWrapper();
     private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L);
 
@@ -94,36 +93,36 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testTopologyMetadata() {
-        builder.setApplicationId("X");
+        topology.setApplicationId("X");
 
-        builder.addSource("source-1", "topic-1");
-        builder.addSource("source-2", "topic-2", "topic-3");
-        builder.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1");
-        builder.addProcessor("processor-2", new MockProcessorSupplier<>(), "source-1", "source-2");
-        builder.addSink("sink-1", "topic-3", "processor-1");
-        builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
+        topology.addSource("source-1", "topic-1");
+        topology.addSource("source-2", "topic-2", "topic-3");
+        topology.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1");
+        topology.addProcessor("processor-2", new MockProcessorSupplier<>(), "source-1", "source-2");
+        topology.addSink("sink-1", "topic-3", "processor-1");
+        topology.addSink("sink-2", "topic-4", "processor-1", "processor-2");
 
-        final ProcessorTopology topology = builder.build(null);
+        final ProcessorTopology processorTopology = topology.getInternalBuilder().build();
 
-        assertEquals(6, topology.processors().size());
+        assertEquals(6, processorTopology.processors().size());
 
-        assertEquals(2, topology.sources().size());
+        assertEquals(2, processorTopology.sources().size());
 
-        assertEquals(3, topology.sourceTopics().size());
+        assertEquals(3, processorTopology.sourceTopics().size());
 
-        assertNotNull(topology.source("topic-1"));
+        assertNotNull(processorTopology.source("topic-1"));
 
-        assertNotNull(topology.source("topic-2"));
+        assertNotNull(processorTopology.source("topic-2"));
 
-        assertNotNull(topology.source("topic-3"));
+        assertNotNull(processorTopology.source("topic-3"));
 
-        assertEquals(topology.source("topic-2"), topology.source("topic-3"));
+        assertEquals(processorTopology.source("topic-2"), processorTopology.source("topic-3"));
     }
 
     @Test
     public void testDrivingSimpleTopology() {
         int partition = 10;
-        driver = new TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder, props);
+        driver = new TopologyTestDriverWrapper(createSimpleTopology(partition), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
@@ -144,7 +143,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingMultiplexingTopology() {
-        driver = new TopologyTestDriverWrapper(createMultiplexingTopology().internalTopologyBuilder, props);
+        driver = new TopologyTestDriverWrapper(createMultiplexingTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -166,7 +165,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingMultiplexByNameTopology() {
-        driver = new TopologyTestDriverWrapper(createMultiplexByNameTopology().internalTopologyBuilder, props);
+        driver = new TopologyTestDriverWrapper(createMultiplexByNameTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -189,7 +188,7 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingStatefulTopology() {
         String storeName = "entries";
-        driver = new TopologyTestDriverWrapper(createStatefulTopology(storeName).internalTopologyBuilder, props);
+        driver = new TopologyTestDriverWrapper(createStatefulTopology(storeName), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
@@ -203,19 +202,17 @@ public class ProcessorTopologyTest {
         assertNull(store.get("key4"));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldDriveGlobalStore() {
         final String storeName = "my-store";
-        final StateStoreSupplier storeSupplier = Stores.create(storeName)
-                .withStringKeys().withStringValues().inMemory().disableLogging().build();
         final String global = "global";
         final String topic = "topic";
-        final TopologyBuilder topologyBuilder = this.builder
-                .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName)));
 
-        driver = new TopologyTestDriverWrapper(topologyBuilder.internalTopologyBuilder, props);
-        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store");
+        topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()).withLoggingDisabled(),
+                global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName)));
+
+        driver = new TopologyTestDriverWrapper(topology.getInternalBuilder(), props);
+        final KeyValueStore<String, String> globalStore = driver.getKeyValueStore(storeName);
         driver.pipeInput(recordFactory.create(topic, "key1", "value1"));
         driver.pipeInput(recordFactory.create(topic, "key2", "value2"));
         assertEquals("value1", globalStore.get("key1"));
@@ -225,7 +222,7 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingSimpleMultiSourceTopology() {
         final int partition = 10;
-        driver = new TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition).internalTopologyBuilder, props);
+        driver = new TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition), props);
 
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
@@ -238,7 +235,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingForwardToSourceTopology() {
-        driver = new TopologyTestDriverWrapper(createForwardToSourceTopology().internalTopologyBuilder, props);
+        driver = new TopologyTestDriverWrapper(createForwardToSourceTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
@@ -249,7 +246,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingInternalRepartitioningTopology() {
-        driver = new TopologyTestDriverWrapper(createInternalRepartitioningTopology().internalTopologyBuilder, props);
+        driver = new TopologyTestDriverWrapper(createInternalRepartitioningTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
@@ -260,7 +257,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
-        driver = new TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder, props);
+        driver = new TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1@1000"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2@2000"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3@3000"));
@@ -274,29 +271,29 @@ public class ProcessorTopologyTest {
 
     @Test
     public void shouldCreateStringWithSourceAndTopics() {
-        builder.addSource("source", "topic1", "topic2");
-        final ProcessorTopology topology = builder.build(null);
-        final String result = topology.toString();
+        topology.addSource("source", "topic1", "topic2");
+        final ProcessorTopology processorTopology = topology.getInternalBuilder().build();
+        final String result = processorTopology.toString();
         assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1, topic2]\n"));
     }
 
     @Test
     public void shouldCreateStringWithMultipleSourcesAndTopics() {
-        builder.addSource("source", "topic1", "topic2");
-        builder.addSource("source2", "t", "t1", "t2");
-        final ProcessorTopology topology = builder.build(null);
-        final String result = topology.toString();
+        topology.addSource("source", "topic1", "topic2");
+        topology.addSource("source2", "t", "t1", "t2");
+        final ProcessorTopology processorTopology = topology.getInternalBuilder().build();
+        final String result = processorTopology.toString();
         assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1, topic2]\n"));
         assertThat(result, containsString("source2:\n\t\ttopics:\t\t[t, t1, t2]\n"));
     }
 
     @Test
     public void shouldCreateStringWithProcessors() {
-        builder.addSource("source", "t")
+        topology.addSource("source", "t")
                 .addProcessor("processor", mockProcessorSupplier, "source")
                 .addProcessor("other", mockProcessorSupplier, "source");
-        final ProcessorTopology topology = builder.build(null);
-        final String result = topology.toString();
+        final ProcessorTopology processorTopology = topology.getInternalBuilder().build();
+        final String result = processorTopology.toString();
         assertThat(result, containsString("\t\tchildren:\t[processor, other]"));
         assertThat(result, containsString("processor:\n"));
         assertThat(result, containsString("other:\n"));
@@ -304,14 +301,14 @@ public class ProcessorTopologyTest {
 
     @Test
     public void shouldRecursivelyPrintChildren() {
-        builder.addSource("source", "t")
+        topology.addSource("source", "t")
                 .addProcessor("processor", mockProcessorSupplier, "source")
                 .addProcessor("child-one", mockProcessorSupplier, "processor")
                 .addProcessor("child-one-one", mockProcessorSupplier, "child-one")
                 .addProcessor("child-two", mockProcessorSupplier, "processor")
                 .addProcessor("child-two-one", mockProcessorSupplier, "child-two");
 
-        final String result = builder.build(null).toString();
+        final String result = topology.getInternalBuilder().build().toString();
         assertThat(result, containsString("child-one:\n\t\tchildren:\t[child-one-one]"));
         assertThat(result, containsString("child-two:\n\t\tchildren:\t[child-two-one]"));
     }
@@ -319,7 +316,7 @@ public class ProcessorTopologyTest {
     @Test
     public void shouldConsiderTimeStamps() {
         final int partition = 10;
-        driver = new TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder, props);
+        driver = new TopologyTestDriverWrapper(createSimpleTopology(partition), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
@@ -331,7 +328,7 @@ public class ProcessorTopologyTest {
     @Test
     public void shouldConsiderModifiedTimeStamps() {
         final int partition = 10;
-        driver = new TopologyTestDriverWrapper(createTimestampTopology(partition).internalTopologyBuilder, props);
+        driver = new TopologyTestDriverWrapper(createTimestampTopology(partition), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
@@ -379,80 +376,92 @@ public class ProcessorTopologyTest {
         };
     }
 
-    private TopologyBuilder createSimpleTopology(final int partition) {
-        return builder
+    private InternalTopologyBuilder createSimpleTopology(final int partition) {
+        return ((TopologyWrapper) topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new ForwardingProcessor()), "source")
-            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
+            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"))
+            .getInternalBuilder();
     }
 
-    private TopologyBuilder createTimestampTopology(final int partition) {
-        return builder
+    private InternalTopologyBuilder createTimestampTopology(final int partition) {
+        return ((TopologyWrapper) topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new TimestampProcessor()), "source")
-            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
+            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"))
+            .getInternalBuilder();
     }
 
-    private TopologyBuilder createMultiplexingTopology() {
-        return builder
+    private InternalTopologyBuilder createMultiplexingTopology() {
+        return ((TopologyWrapper) topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
             .addSink("sink1", OUTPUT_TOPIC_1, "processor")
-            .addSink("sink2", OUTPUT_TOPIC_2, "processor");
+            .addSink("sink2", OUTPUT_TOPIC_2, "processor"))
+            .getInternalBuilder();
     }
 
-    private TopologyBuilder createMultiplexByNameTopology() {
-        return builder
+    private InternalTopologyBuilder createMultiplexByNameTopology() {
+        return ((TopologyWrapper) topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
             .addSink("sink0", OUTPUT_TOPIC_1, "processor")
-            .addSink("sink1", OUTPUT_TOPIC_2, "processor");
+            .addSink("sink1", OUTPUT_TOPIC_2, "processor"))
+                .getInternalBuilder();
     }
 
-    private TopologyBuilder createStatefulTopology(final String storeName) {
-        return builder
+    private InternalTopologyBuilder createStatefulTopology(final String storeName) {
+        return ((TopologyWrapper) topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
-            .addStateStore(
-                Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
-                "processor"
-            )
-            .addSink("counts", OUTPUT_TOPIC_1, "processor");
+            .addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()), "processor")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor"))
+            .getInternalBuilder();
     }
 
-    private TopologyBuilder createInternalRepartitioningTopology() {
-        return builder
+    private InternalTopologyBuilder createInternalRepartitioningTopology() {
+        final InternalTopologyBuilder internalTopologyBuilder = ((TopologyWrapper) topology
             .addSource("source", INPUT_TOPIC_1)
-            .addInternalTopic(THROUGH_TOPIC_1)
             .addSink("sink0", THROUGH_TOPIC_1, "source")
             .addSource("source1", THROUGH_TOPIC_1)
-            .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+            .addSink("sink1", OUTPUT_TOPIC_1, "source1"))
+            .getInternalBuilder();
+
+        internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1);
+
+        return internalTopologyBuilder;
     }
 
-    private TopologyBuilder createInternalRepartitioningWithValueTimestampTopology() {
-        return builder
+    private InternalTopologyBuilder createInternalRepartitioningWithValueTimestampTopology() {
+        final InternalTopologyBuilder internalTopologyBuilder = ((TopologyWrapper) topology
             .addSource("source", INPUT_TOPIC_1)
-            .addInternalTopic(THROUGH_TOPIC_1)
             .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
             .addSink("sink0", THROUGH_TOPIC_1, "processor")
             .addSource("source1", THROUGH_TOPIC_1)
-            .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+            .addSink("sink1", OUTPUT_TOPIC_1, "source1"))
+            .getInternalBuilder();
+
+        internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1);
+
+        return internalTopologyBuilder;
     }
 
-    private TopologyBuilder createForwardToSourceTopology() {
-        return builder.addSource("source-1", INPUT_TOPIC_1)
+    private InternalTopologyBuilder createForwardToSourceTopology() {
+        return ((TopologyWrapper) topology.addSource("source-1", INPUT_TOPIC_1)
                 .addSink("sink-1", OUTPUT_TOPIC_1, "source-1")
                 .addSource("source-2", OUTPUT_TOPIC_1)
-                .addSink("sink-2", OUTPUT_TOPIC_2, "source-2");
+                .addSink("sink-2", OUTPUT_TOPIC_2, "source-2"))
+                .getInternalBuilder();
     }
 
-    private TopologyBuilder createSimpleMultiSourceTopology(int partition) {
-        return builder.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+    private InternalTopologyBuilder createSimpleMultiSourceTopology(int partition) {
+        return ((TopologyWrapper) topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
                 .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1")
                 .addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor-1")
                 .addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2)
                 .addProcessor("processor-2", define(new ForwardingProcessor()), "source-2")
-                .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2");
+                .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2"))
+                .getInternalBuilder();
     }
 
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 54ea1ce..9090012 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -347,7 +347,6 @@ public class StandbyTaskTest {
         );
         task.initializeStateStores();
 
-
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
         final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 4e04b49..0048e73 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
@@ -40,10 +41,10 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Test;
@@ -328,10 +329,10 @@ public class StreamsPartitionAssignorTest {
     public void testAssignWithPartialTopology() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
-        builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1");
+        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
-        builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor2");
+        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor2");
         List<String> topics = Utils.mkList("topic1", "topic2");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
@@ -469,11 +470,11 @@ public class StreamsPartitionAssignorTest {
         builder.addSource(null, "source2", null, null, null, "topic2");
 
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
-        builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
+        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-1");
 
         builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
-        builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
-        builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
+        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-2");
+        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store3"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-2");
 
         List<String> topics = Utils.mkList("topic1", "topic2");
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 665ebc0..5383c27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
 import org.apache.kafka.streams.state.internals.RocksDBSessionStore;
@@ -25,16 +24,10 @@ import org.apache.kafka.streams.state.internals.RocksDBStore;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.Map;
-
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class StoresTest {
@@ -104,70 +97,10 @@ public class StoresTest {
         Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() {
-        final StateStoreSupplier supplier = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .inMemory()
-                .enableLogging(Collections.singletonMap("retention.ms", "1000"))
-                .build();
-
-        final Map<String, String> config = supplier.logConfig();
-        assertTrue(supplier.loggingEnabled());
-        assertEquals("1000", config.get("retention.ms"));
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test
-    public void shouldCreateInMemoryStoreSupplierNotLogged() {
-        final StateStoreSupplier supplier = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .inMemory()
-                .disableLogging()
-                .build();
-
-        assertFalse(supplier.loggingEnabled());
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test
-    public void shouldCreatePersistenStoreSupplierWithLoggedConfig() {
-        final StateStoreSupplier supplier = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .persistent()
-                .enableLogging(Collections.singletonMap("retention.ms", "1000"))
-                .build();
-
-        final Map<String, String> config = supplier.logConfig();
-        assertTrue(supplier.loggingEnabled());
-        assertEquals("1000", config.get("retention.ms"));
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test
-    public void shouldCreatePersistenStoreSupplierNotLogged() {
-        final StateStoreSupplier supplier = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .persistent()
-                .disableLogging()
-                .build();
-
-        assertFalse(supplier.loggingEnabled());
-    }
-
     @Test
     public void shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments() {
-        final Stores.PersistentKeyValueFactory<String, String> storeFactory = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .persistent();
         try {
-            storeFactory.windowed(1, 1, 1, false);
+            Stores.persistentWindowStore("store", 1, 1, 1, false);
             fail("Should have thrown illegal argument exception as number of segments is less than 2");
         } catch (final IllegalArgumentException e) {
          // ok
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 4ff0b90..a061ff2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -16,12 +16,18 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpReadOnlyStore;
+import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.StateStoreProviderStub;
 import org.junit.Before;
 import org.junit.Test;
@@ -64,7 +70,16 @@ public class CompositeReadOnlyKeyValueStoreTest {
     }
 
     private KeyValueStore<String, String> newStoreInstance() {
-        return StateStoreTestUtils.newKeyValueStore(storeName, "app-id", String.class, String.class);
+        final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
+                Serdes.String(),
+                Serdes.String())
+                .build();
+
+        store.init(new InternalMockProcessorContext(new StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName), Serdes.String(), Serdes.String()),
+                                                    new NoOpRecordCollector()),
+                store);
+
+        return store;
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
deleted file mode 100644
index b25b8cb..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class RocksDBKeyValueStoreSupplierTest {
-
-    private static final String STORE_NAME = "name";
-    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
-    private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                          Serdes.String(),
-                                                                          Serdes.String(),
-                                                                          new NoOpRecordCollector(),
-                                                                          cache);
-    private KeyValueStore<String, String> store;
-
-    @After
-    public void close() {
-        store.close();
-    }
-
-    @Test
-    public void shouldCreateLoggingEnabledStoreWhenStoreLogged() {
-        store = createStore(true, false);
-        final List<ProducerRecord> logged = new ArrayList<>();
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
-            }
-        };
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                      Serdes.String(),
-                                                                      Serdes.String(),
-                                                                      collector,
-                                                                      cache);
-        context.setTime(1);
-        store.init(context, store);
-        store.put("a", "b");
-        assertFalse(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() {
-        store = createStore(false, false);
-        final List<ProducerRecord> logged = new ArrayList<>();
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                logged.add(new ProducerRecord<>(topic, partition, timestamp, key, value));
-            }
-        };
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                      Serdes.String(),
-                                                                      Serdes.String(),
-                                                                      collector,
-                                                                      cache);
-        context.setTime(1);
-        store.init(context, store);
-        store.put("a", "b");
-        assertTrue(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldHaveCachedKeyValueStoreWhenCachingEnabled() {
-        store = createStore(false, true);
-        store.init(context, store);
-        context.setTime(1);
-        store.put("a", "b");
-        store.put("b", "c");
-        assertThat(((WrappedStateStore) store).wrappedStore(), is(instanceOf(CachingKeyValueStore.class)));
-        assertThat(cache.size(), is(2L));
-    }
-
-    @Test
-    public void shouldReturnMeteredStoreWhenCachingAndLoggingDisabled() {
-        store = createStore(false, false);
-        assertThat(store, is(instanceOf(MeteredKeyValueBytesStore.class)));
-    }
-
-    @Test
-    public void shouldReturnMeteredStoreWhenCachingDisabled() {
-        store = createStore(true, false);
-        assertThat(store, is(instanceOf(MeteredKeyValueBytesStore.class)));
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenCached() {
-        store = createStore(false, true);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenLogged() {
-        store = createStore(true, false);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @SuppressWarnings("unchecked")
-    private KeyValueStore<String, String> createStore(final boolean logged, final boolean cached) {
-        return new RocksDBKeyValueStoreSupplier<>(STORE_NAME,
-                                                  Serdes.String(),
-                                                  Serdes.String(),
-                                                  logged,
-                                                  Collections.EMPTY_MAP,
-                                                  cached).get();
-    }
-
-}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
deleted file mode 100644
index c50dfba..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class RocksDBSessionStoreSupplierTest {
-
-    private static final String STORE_NAME = "name";
-    private final List<ProducerRecord> logged = new ArrayList<>();
-    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
-    private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
-        Serdes.String(),
-        Serdes.String(),
-        new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    final K key,
-                                    final V value,
-                                    final Integer partition,
-                                    final Long timestamp,
-                                    final Serializer<K> keySerializer,
-                                    final Serializer<V> valueSerializer) {
-                logged.add(new ProducerRecord<>(topic, partition, timestamp, key, value));
-            }
-        },
-        cache);
-
-    private SessionStore<String, String> store;
-
-    @After
-    public void close() {
-        store.close();
-    }
-
-    @Test
-    public void shouldCreateLoggingEnabledStoreWhenStoreLogged() {
-        store = createStore(true, false);
-        context.setTime(1);
-        store.init(context, store);
-        store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
-        assertFalse(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() {
-        store = createStore(false, false);
-        context.setTime(1);
-        store.init(context, store);
-        store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
-        assertTrue(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldReturnCachedSessionStoreWhenCachingEnabled() {
-        store = createStore(false, true);
-        store.init(context, store);
-        context.setTime(1);
-        store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
-        store.put(new Windowed<>("b", new SessionWindow(0, 10)), "c");
-        assertThat(((WrappedStateStore) store).wrappedStore(), is(instanceOf(CachingSessionStore.class)));
-        assertThat(cache.size(), is(2L));
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenCached() {
-        store = createStore(false, true);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenLogged() {
-        store = createStore(true, false);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenNotLoggedOrCached() {
-        store = createStore(false, false);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-
-
-    private SessionStore<String, String> createStore(final boolean logged, final boolean cached) {
-        return new RocksDBSessionStoreSupplier<>(STORE_NAME,
-                                                 10,
-                                                 Serdes.String(),
-                                                 Serdes.String(),
-                                                 logged,
-                                                 Collections.<String, String>emptyMap(),
-                                                 cached).get();
-    }
-
-}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
deleted file mode 100644
index 7409a13..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class RocksDBWindowStoreSupplierTest {
-
-    private static final String STORE_NAME = "name";
-    private WindowStore<String, String> store;
-    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
-    private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                          Serdes.String(),
-                                                                          Serdes.String(),
-                                                                          new NoOpRecordCollector(),
-                                                                          cache);
-
-    @After
-    public void close() {
-        if (store != null) {
-            store.close();
-        }
-    }
-
-    @Test
-    public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() {
-        store = createStore(true, false, 3);
-        final List<ProducerRecord> logged = new ArrayList<>();
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
-            }
-        };
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                      Serdes.String(),
-                                                                      Serdes.String(),
-                                                                      collector,
-                                                                      cache);
-        context.setTime(1);
-        store.init(context, store);
-        store.put("a", "b");
-        assertFalse(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() {
-        store = createStore(false, false, 3);
-        final List<ProducerRecord> logged = new ArrayList<>();
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
-            }
-        };
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                      Serdes.String(),
-                                                                      Serdes.String(),
-                                                                      collector,
-                                                                      cache);
-        context.setTime(1);
-        store.init(context, store);
-        store.put("a", "b");
-        assertTrue(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldBeCachedWindowStoreWhenCachingEnabled() {
-        store = createStore(false, true, 3);
-        store.init(context, store);
-        context.setTime(1);
-        store.put("a", "b");
-        store.put("b", "c");
-        assertThat(((WrappedStateStore) store).wrappedStore(), is(instanceOf(CachingWindowStore.class)));
-        assertThat(context.getCache().size(), is(2L));
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreAsOuterMost() {
-        assertThat(createStore(false, false, 2), instanceOf(MeteredWindowStore.class));
-        assertThat(createStore(false, true, 2), instanceOf(MeteredWindowStore.class));
-        assertThat(createStore(true, false, 2), instanceOf(MeteredWindowStore.class));
-    }
-    @Test
-    public void shouldHaveMeteredStoreWhenCached() {
-        store = createStore(false, true, 3);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenLogged() {
-        store = createStore(true, false, 3);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenNotLoggedOrCached() {
-        store = createStore(false, false, 3);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowIllegalArgumentExceptionIfNumSegmentsLessThanTwo() {
-        createStore(true, true, 1);
-    }
-
-    @SuppressWarnings("unchecked")
-    private WindowStore<String, String> createStore(final boolean logged, final boolean cached, final int numSegments) {
-        return new RocksDBWindowStoreSupplier<>(STORE_NAME,
-                                                10,
-                                                numSegments,
-                                                false,
-                                                Serdes.String(),
-                                                Serdes.String(),
-                                                10,
-                                                logged,
-                                                Collections.<String, String>emptyMap(),
-                                                cached).get();
-    }
-
-}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
deleted file mode 100644
index b1818c2..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-
-import java.util.Collections;
-
-@SuppressWarnings("unchecked")
-public class StateStoreTestUtils {
-
-    public static <K, V> KeyValueStore<K, V> newKeyValueStore(final String name,
-                                                              final String applicationId,
-                                                              final Class<K> keyType,
-                                                              final Class<V> valueType) {
-        final InMemoryKeyValueStoreSupplier<K, V> supplier = new InMemoryKeyValueStoreSupplier<>(name,
-                                                                                                 null,
-                                                                                                 null,
-                                                                                                 new MockTime(),
-                                                                                                 false,
-                                                                                                 Collections.<String, String>emptyMap());
-
-        final StateStore stateStore = supplier.get();
-        stateStore.init(
-            new InternalMockProcessorContext(
-                StateSerdes.withBuiltinTypes(
-                    ProcessorStateManager.storeChangelogTopic(applicationId, name),
-                    keyType,
-                    valueType),
-                new NoOpRecordCollector()),
-            stateStore);
-        return (KeyValueStore<K, V>) stateStore;
-
-    }
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index dc04536..c24122a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -20,13 +20,14 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
@@ -68,21 +69,13 @@ public class StreamThreadStateStoreProviderTest {
     private StreamThread threadMock;
     private Map<TaskId, StreamTask> tasks;
 
-    @SuppressWarnings("deprecation")
     @Before
     public void before() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("the-source", topicName);
-        builder.addProcessor("the-processor", new MockProcessorSupplier(), "the-source");
-        builder.addStateStore(Stores.create("kv-store")
-            .withStringKeys()
-            .withStringValues().inMemory().build(), "the-processor");
-
-        builder.addStateStore(Stores.create("window-store")
-            .withStringKeys()
-            .withStringValues()
-            .persistent()
-            .windowed(10, 10, 2, false).build(), "the-processor");
+        final TopologyWrapper topology = new TopologyWrapper();
+        topology.addSource("the-source", topicName);
+        topology.addProcessor("the-processor", new MockProcessorSupplier(), "the-source");
+        topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv-store"), Serdes.String(), Serdes.String()), "the-processor");
+        topology.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("window-store", 10, 2, 2, false), Serdes.String(), Serdes.String()), "the-processor");
 
         final Properties properties = new Properties();
         final String applicationId = "applicationId";
@@ -96,17 +89,17 @@ public class StreamThreadStateStoreProviderTest {
         configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog");
         configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog");
 
-        builder.setApplicationId(applicationId);
-        final ProcessorTopology topology = builder.build(null);
+        topology.setApplicationId(applicationId);
+        final ProcessorTopology processorTopology = topology.getInternalBuilder().build();
 
         tasks = new HashMap<>();
         stateDirectory = new StateDirectory(streamsConfig, new MockTime());
 
-        taskOne = createStreamsTask(streamsConfig, clientSupplier, topology, new TaskId(0, 0));
+        taskOne = createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 0));
         taskOne.initializeStateStores();
         tasks.put(new TaskId(0, 0), taskOne);
 
-        final StreamTask taskTwo = createStreamsTask(streamsConfig, clientSupplier, topology, new TaskId(0, 1));
+        final StreamTask taskTwo = createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 1));
         taskTwo.initializeStateStores();
         tasks.put(new TaskId(0, 1), taskTwo);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index b5379d7..06c14ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -17,11 +17,13 @@
 package org.apache.kafka.streams.state.internals;
 
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.NoOpWindowStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.StateStoreProviderStub;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,9 +44,15 @@ public class WrappingStoreProviderTest {
         final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false);
 
 
-        stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class));
+        stubProviderOne.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
+                Serdes.serdeFrom(String.class),
+                Serdes.serdeFrom(String.class))
+                .build());
         stubProviderOne.addStore("window", new NoOpWindowStore());
-        stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class));
+        stubProviderTwo.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
+                Serdes.serdeFrom(String.class),
+                Serdes.serdeFrom(String.class))
+                .build());
         stubProviderTwo.addStore("window", new NoOpWindowStore());
 
         wrappingStoreProvider = new WrappingStoreProvider(

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message