kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3595: window stores use compact, delete config for changelogs
Date Thu, 08 Sep 2016 01:02:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk eba0ede87 -> 69ebf6f7b


http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index b9a1cf6..2d9b9a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -35,10 +35,12 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -50,8 +52,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests related to internal topics in streams
@@ -65,6 +69,8 @@ public class InternalTopicIntegrationTest {
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
     private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
+    private Properties streamsConfiguration;
+    private String applicationId = "compact-topics-integration-test";
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
@@ -72,14 +78,20 @@ public class InternalTopicIntegrationTest {
         CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
     }
 
-    /**
-     * Validates that any state changelog topics are compacted
-     *
-     * @return true if topics have a valid config, false otherwise
-     */
-    private boolean isUsingCompactionForStateChangelogTopics() {
-        boolean valid = true;
+    @Before
+    public void before() {
+        streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    }
+
 
+    private Properties getTopicConfigProperties(final String changelog) {
         // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
         // createTopic() will only seem to work (it will return without error).  The topic will exist in
         // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
@@ -89,33 +101,28 @@ public class InternalTopicIntegrationTest {
             DEFAULT_ZK_SESSION_TIMEOUT_MS,
             DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
             ZKStringSerializer$.MODULE$);
-        final boolean isSecure = false;
-        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), isSecure);
-
-        final Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils);
-        final Iterator it = topicConfigs.iterator();
-        while (it.hasNext()) {
-            final Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next();
-            final String topic = topicConfig._1;
-            final Properties prop = topicConfig._2;
-
-            // state changelogs should be compacted
-            if (topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
-                if (!prop.containsKey(LogConfig.CleanupPolicyProp()) ||
-                    !prop.getProperty(LogConfig.CleanupPolicyProp()).equals(LogConfig.Compact())) {
-                    valid = false;
-                    break;
+        try {
+            final boolean isSecure = false;
+            final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), isSecure);
+
+            final Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils);
+            final Iterator it = topicConfigs.iterator();
+            while (it.hasNext()) {
+                final Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next();
+                final String topic = topicConfig._1;
+                final Properties prop = topicConfig._2;
+                if (topic.equals(changelog)) {
+                    return prop;
                 }
             }
+            return new Properties();
+        } finally {
+            zkClient.close();
         }
-        zkClient.close();
-        return valid;
     }
 
     @Test
     public void shouldCompactTopicsForStateChangelogs() throws Exception {
-        final List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world");
-
         //
         // Step 1: Configure and start a simple word count topology
         //
@@ -154,6 +161,17 @@ public class InternalTopicIntegrationTest {
         //
         // Step 2: Produce some input data to the input topic.
         //
+        produceData(Arrays.asList("hello", "world", "world", "hello world"));
+
+        //
+        // Step 3: Verify the state changelog topics are compact
+        //
+        streams.close();
+        final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "Counts"));
+        assertEquals(LogConfig.Compact(), properties.getProperty(LogConfig.CleanupPolicyProp()));
+    }
+
+    private void produceData(final List<String> inputValues) throws java.util.concurrent.ExecutionException, InterruptedException {
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
@@ -161,11 +179,47 @@ public class InternalTopicIntegrationTest {
         producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig, mockTime);
+    }
+
+    @Test
+    public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
+
+        final int durationMs = 2000;
+        textLines
+                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                    @Override
+                    public Iterable<String> apply(String value) {
+                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    }
+                }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper())
+                .count(TimeWindows.of(1000).until(durationMs), "CountWindows").toStream();
+
+
+        // Remove any state from previous test runs
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        //
+        // Step 2: Produce some input data to the input topic.
+        //
+        produceData(Arrays.asList("hello", "world", "world", "hello world"));
 
         //
         // Step 3: Verify the state changelog topics are compact
         //
         streams.close();
-        assertEquals(isUsingCompactionForStateChangelogTopics(), true);
+        final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "CountWindows"));
+        final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
+        assertEquals(2, policies.size());
+        assertTrue(policies.contains(LogConfig.Compact()));
+        assertTrue(policies.contains(LogConfig.Delete()));
+        // retention should be 1 day + the window duration
+        final Long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) + durationMs;
+        assertEquals(retention, Long.valueOf(properties.getProperty(LogConfig.RetentionMsProp())));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index fe66acb..a4c008a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -19,19 +19,24 @@ package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -264,9 +269,9 @@ public class TopologyBuilderTest {
         Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
-        expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet()));
-        expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), Collections.<String>emptySet()));
+        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -302,9 +307,32 @@ public class TopologyBuilderTest {
         Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1"))));
-        expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-2"))));
-        expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-3"))));
+        final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1");
+        final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
+        final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
+        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
+                                                  Collections.<String, InternalTopicConfig>emptyMap(),
+                                                  Collections.singletonMap(store1,
+                                                                           new InternalTopicConfig(
+                                                                                   store1,
+                                                                                   Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                                                   Collections.<String, String>emptyMap()))));
+        expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
+                                                  Collections.<String, InternalTopicConfig>emptyMap(),
+                                                  Collections.singletonMap(store2,
+                                                                           new InternalTopicConfig(
+                                                                                   store2,
+                                                                                   Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                                                   Collections.<String, String>emptyMap()))));
+        expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"),
+                                                  Collections.<String, InternalTopicConfig>emptyMap(),
+                                                  Collections.singletonMap(store3,
+                                                                           new InternalTopicConfig(
+                                                                                   store3,
+                                                                                   Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                                                   Collections.<String, String>emptyMap()))));
+
+
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -390,7 +418,7 @@ public class TopologyBuilderTest {
     @Test(expected = NullPointerException.class)
     public void shouldNotAddNullStateStoreSupplier() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
-        builder.addStateStore(null, true);
+        builder.addStateStore(null);
     }
 
     private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
@@ -406,7 +434,7 @@ public class TopologyBuilderTest {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source", "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), true, "processor");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
         final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store"));
@@ -417,7 +445,7 @@ public class TopologyBuilderTest {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source", "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), false, "processor");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
         final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store"));
@@ -430,10 +458,63 @@ public class TopologyBuilderTest {
         builder.addInternalTopic("internal-topic");
         builder.addSource("source", "internal-topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), true, "processor");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
         final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singleton("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId("appId");
+        builder.addSource("source", "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, true, Collections.<String, String>emptyMap()), "processor");
+        final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
+        final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
+        final Properties properties = topicConfig.toProperties(0);
+        final List<String> policies = Arrays.asList(properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP).split(","));
+        assertEquals("appId-store-changelog", topicConfig.name());
+        assertTrue(policies.contains("compact"));
+        assertTrue(policies.contains("delete"));
+        assertEquals(2, policies.size());
+        assertEquals("30000", properties.getProperty(InternalTopicManager.RETENTION_MS));
+        assertEquals(2, properties.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId("appId");
+        builder.addSource("source", "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("name", true), "processor");
+        final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
+        final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-name-changelog");
+        final Properties properties = topicConfig.toProperties(0);
+        assertEquals("appId-name-changelog", topicConfig.name());
+        assertEquals("compact", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
+        assertEquals(1, properties.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId("appId");
+        builder.addInternalTopic("foo");
+        builder.addSource("source", "foo");
+        final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
+        final InternalTopicConfig topicConfig = topicsInfo.interSourceTopics.get("appId-foo");
+        final Properties properties = topicConfig.toProperties(0);
+        assertEquals("appId-foo", topicConfig.name());
+        assertEquals("delete", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
+        assertEquals(1, properties.size());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
new file mode 100644
index 0000000..b0a198b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class InternalTopicConfigTest {
+
+    @Test
+    public void shouldHaveCompactionPropSetIfSupplied() throws Exception {
+        final Properties properties = new InternalTopicConfig("name",
+                                                              Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                              Collections.<String, String>emptyMap()).toProperties(0);
+        assertEquals("compact", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
+    }
+
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfNameIsNull() throws Exception {
+        new InternalTopicConfig(null, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.<String, String>emptyMap());
+    }
+
+    @Test
+    public void shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete() throws Exception {
+        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
+                                                                        Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact, InternalTopicConfig.CleanupPolicy.delete),
+                                                                        Collections.<String, String>emptyMap());
+        final int additionalRetentionMs = 20;
+        topicConfig.setRetentionMs(10);
+        final Properties properties = topicConfig.toProperties(additionalRetentionMs);
+        assertEquals("30", properties.getProperty(InternalTopicManager.RETENTION_MS));
+    }
+
+    @Test
+    public void shouldNotConfigureRetentionMsWhenCompact() throws Exception {
+        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
+                                                                        Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                                        Collections.<String, String>emptyMap());
+        topicConfig.setRetentionMs(10);
+        final Properties properties = topicConfig.toProperties(0);
+        assertNull(null, properties.getProperty(InternalTopicManager.RETENTION_MS));
+    }
+
+    @Test
+    public void shouldNotConfigureRetentionMsWhenDelete() throws Exception {
+        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
+                                                                        Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                                                        Collections.<String, String>emptyMap());
+        topicConfig.setRetentionMs(10);
+        final Properties properties = topicConfig.toProperties(0);
+        assertNull(null, properties.getProperty(InternalTopicManager.RETENTION_MS));
+    }
+
+
+    @Test
+    public void shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete() throws Exception {
+        assertTrue(new InternalTopicConfig("name",
+                                           Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                           Collections.<String, String>emptyMap()).isCompacted());
+        assertTrue(new InternalTopicConfig("name", Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
+                                                               InternalTopicConfig.CleanupPolicy.delete),
+                                           Collections.<String, String>emptyMap()).isCompacted());
+    }
+
+    @Test
+    public void shouldNotBeCompactedWhenCleanupPolicyIsDelete() throws Exception {
+        assertFalse(new InternalTopicConfig("name",
+                                            Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                            Collections.<String, String>emptyMap()).isCompacted());
+    }
+
+    @Test
+    public void shouldUseCleanupPolicyFromConfigIfSupplied() throws Exception {
+        final InternalTopicConfig config = new InternalTopicConfig("name",
+                                                                   Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                                                   Collections.singletonMap("cleanup.policy", "compact"));
+
+        final Properties properties = config.toProperties(0);
+        assertEquals("compact", properties.getProperty("cleanup.policy"));
+    }
+
+    @Test
+    public void shouldHavePropertiesSuppliedByUser() throws Exception {
+        final Map<String, String> configs = new HashMap<>();
+        configs.put("retention.ms", "1000");
+        configs.put("retention.bytes", "10000");
+
+        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
+                                                                 Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                                                 configs);
+
+        final Properties properties = topicConfig.toProperties(0);
+        assertEquals("1000", properties.getProperty("retention.ms"));
+        assertEquals("10000", properties.getProperty("retention.bytes"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e300966..e5ae7d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -735,15 +735,15 @@ public class StreamPartitionAssignorTest {
         }
 
         @Override
-        public void makeReady(String topic, int numPartitions, boolean compactTopic) {
-            readyTopics.put(topic, numPartitions);
+        public void makeReady(InternalTopicConfig topic, int numPartitions) {
+            readyTopics.put(topic.name(), numPartitions);
 
             List<PartitionInfo> partitions = new ArrayList<>();
             for (int i = 0; i < numPartitions; i++) {
-                partitions.add(new PartitionInfo(topic, i, null, null, null));
+                partitions.add(new PartitionInfo(topic.name(), i, null, null, null));
             }
 
-            restoreConsumer.updatePartitions(topic, partitions);
+            restoreConsumer.updatePartitions(topic.name(), partitions);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
new file mode 100644
index 0000000..18d158d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StoresTest {
+
+    @Test
+    public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() throws Exception {
+        final StateStoreSupplier supplier = Stores.create("store")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.String())
+                .inMemory()
+                .enableLogging(Collections.singletonMap("retention.ms", "1000"))
+                .build();
+
+        final Map<String, String> config = supplier.logConfig();
+        assertTrue(supplier.loggingEnabled());
+        assertEquals("1000", config.get("retention.ms"));
+    }
+
+    @Test
+    public void shouldCreateInMemoryStoreSupplierNotLogged() throws Exception {
+        final StateStoreSupplier supplier = Stores.create("store")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.String())
+                .inMemory()
+                .disableLogging()
+                .build();
+
+        assertFalse(supplier.loggingEnabled());
+    }
+
+    @Test
+    public void shouldCreatePersistenStoreSupplierWithLoggedConfig() throws Exception {
+        final StateStoreSupplier supplier = Stores.create("store")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.String())
+                .persistent()
+                .enableLogging(Collections.singletonMap("retention.ms", "1000"))
+                .build();
+
+        final Map<String, String> config = supplier.logConfig();
+        assertTrue(supplier.loggingEnabled());
+        assertEquals("1000", config.get("retention.ms"));
+    }
+
+    @Test
+    public void shouldCreatePersistenStoreSupplierNotLogged() throws Exception {
+        final StateStoreSupplier supplier = Stores.create("store")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.String())
+                .persistent()
+                .disableLogging()
+                .build();
+
+        assertFalse(supplier.loggingEnabled());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 8a22d37..700655e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -26,125 +26,128 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
+import org.apache.kafka.test.MockProcessorContext;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public abstract class AbstractKeyValueStoreTest {
 
+
+
     protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
                                                                       Class<K> keyClass, Class<V> valueClass,
                                                                       boolean useContextSerdes);
 
+    protected KeyValueStore<Integer, String> store;
+    protected KeyValueStoreTestDriver<Integer, String> driver;
+
+    @Before
+    public void before() {
+        driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        final MockProcessorContext context = (MockProcessorContext) driver.context();
+        context.setTime(10);
+        store = createKeyValueStore(context, Integer.class, String.class, false);
+    }
+
+    @After
+    public void after() {
+        store.close();
+    }
+
     @Test
     public void testPutGetRange() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
-        try {
-
-            // Verify that the store reads and writes correctly ...
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(4, "four");
-            store.put(5, "five");
-            assertEquals(5, driver.sizeOf(store));
-            assertEquals("zero", store.get(0));
-            assertEquals("one", store.get(1));
-            assertEquals("two", store.get(2));
-            assertNull(store.get(3));
-            assertEquals("four", store.get(4));
-            assertEquals("five", store.get(5));
-            store.delete(5);
-
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
-            assertEquals("zero", driver.flushedEntryStored(0));
-            assertEquals("one", driver.flushedEntryStored(1));
-            assertEquals("two", driver.flushedEntryStored(2));
-            assertEquals("four", driver.flushedEntryStored(4));
-            assertEquals(null, driver.flushedEntryStored(5));
-
-            assertEquals(false, driver.flushedEntryRemoved(0));
-            assertEquals(false, driver.flushedEntryRemoved(1));
-            assertEquals(false, driver.flushedEntryRemoved(2));
-            assertEquals(false, driver.flushedEntryRemoved(4));
-            assertEquals(true, driver.flushedEntryRemoved(5));
-
-            // Check range iteration ...
-            try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
-                while (iter.hasNext()) {
-                    KeyValue<Integer, String> entry = iter.next();
-                    if (entry.key.equals(2))
-                        assertEquals("two", entry.value);
-                    else if (entry.key.equals(4))
-                        assertEquals("four", entry.value);
-                    else
-                        fail("Unexpected entry: " + entry);
-                }
+        // Verify that the store reads and writes correctly ...
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(4, "four");
+        store.put(5, "five");
+        assertEquals(5, driver.sizeOf(store));
+        assertEquals("zero", store.get(0));
+        assertEquals("one", store.get(1));
+        assertEquals("two", store.get(2));
+        assertNull(store.get(3));
+        assertEquals("four", store.get(4));
+        assertEquals("five", store.get(5));
+        store.delete(5);
+
+        // Flush the store and verify all current entries were properly flushed ...
+        store.flush();
+        assertEquals("zero", driver.flushedEntryStored(0));
+        assertEquals("one", driver.flushedEntryStored(1));
+        assertEquals("two", driver.flushedEntryStored(2));
+        assertEquals("four", driver.flushedEntryStored(4));
+        assertEquals(null, driver.flushedEntryStored(5));
+
+        assertEquals(false, driver.flushedEntryRemoved(0));
+        assertEquals(false, driver.flushedEntryRemoved(1));
+        assertEquals(false, driver.flushedEntryRemoved(2));
+        assertEquals(false, driver.flushedEntryRemoved(4));
+        assertEquals(true, driver.flushedEntryRemoved(5));
+
+        // Check range iteration ...
+        try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
+            while (iter.hasNext()) {
+                KeyValue<Integer, String> entry = iter.next();
+                if (entry.key.equals(2))
+                    assertEquals("two", entry.value);
+                else if (entry.key.equals(4))
+                    assertEquals("four", entry.value);
+                else
+                    fail("Unexpected entry: " + entry);
             }
+        }
 
-            // Check range iteration ...
-            try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
-                while (iter.hasNext()) {
-                    KeyValue<Integer, String> entry = iter.next();
-                    if (entry.key.equals(2))
-                        assertEquals("two", entry.value);
-                    else if (entry.key.equals(4))
-                        assertEquals("four", entry.value);
-                    else
-                        fail("Unexpected entry: " + entry);
-                }
+        // Check range iteration ...
+        try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
+            while (iter.hasNext()) {
+                KeyValue<Integer, String> entry = iter.next();
+                if (entry.key.equals(2))
+                    assertEquals("two", entry.value);
+                else if (entry.key.equals(4))
+                    assertEquals("four", entry.value);
+                else
+                    fail("Unexpected entry: " + entry);
             }
-        } finally {
-            store.close();
         }
     }
 
     @Test
     public void testPutGetRangeWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
-        try {
-
-            // Verify that the store reads and writes correctly ...
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(4, "four");
-            store.put(5, "five");
-            assertEquals(5, driver.sizeOf(store));
-            assertEquals("zero", store.get(0));
-            assertEquals("one", store.get(1));
-            assertEquals("two", store.get(2));
-            assertNull(store.get(3));
-            assertEquals("four", store.get(4));
-            assertEquals("five", store.get(5));
-            store.delete(5);
-
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
-            assertEquals("zero", driver.flushedEntryStored(0));
-            assertEquals("one", driver.flushedEntryStored(1));
-            assertEquals("two", driver.flushedEntryStored(2));
-            assertEquals("four", driver.flushedEntryStored(4));
-            assertEquals(null, driver.flushedEntryStored(5));
-
-            assertEquals(false, driver.flushedEntryRemoved(0));
-            assertEquals(false, driver.flushedEntryRemoved(1));
-            assertEquals(false, driver.flushedEntryRemoved(2));
-            assertEquals(false, driver.flushedEntryRemoved(4));
-            assertEquals(true, driver.flushedEntryRemoved(5));
-        } finally {
-            store.close();
-        }
+        // Verify that the store reads and writes correctly ...
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(4, "four");
+        store.put(5, "five");
+        assertEquals(5, driver.sizeOf(store));
+        assertEquals("zero", store.get(0));
+        assertEquals("one", store.get(1));
+        assertEquals("two", store.get(2));
+        assertNull(store.get(3));
+        assertEquals("four", store.get(4));
+        assertEquals("five", store.get(5));
+        store.delete(5);
+
+        // Flush the store and verify all current entries were properly flushed ...
+        store.flush();
+        assertEquals("zero", driver.flushedEntryStored(0));
+        assertEquals("one", driver.flushedEntryStored(1));
+        assertEquals("two", driver.flushedEntryStored(2));
+        assertEquals("four", driver.flushedEntryStored(4));
+        assertEquals(null, driver.flushedEntryStored(5));
+
+        assertEquals(false, driver.flushedEntryRemoved(0));
+        assertEquals(false, driver.flushedEntryRemoved(1));
+        assertEquals(false, driver.flushedEntryRemoved(2));
+        assertEquals(false, driver.flushedEntryRemoved(4));
+        assertEquals(true, driver.flushedEntryRemoved(5));
     }
 
     @Test
     public void testRestore() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
+        store.close();
         // Add any entries that will be restored to any store
         // that uses the driver's context ...
         driver.addEntryToRestoreLog(0, "zero");
@@ -154,23 +157,17 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
-        try {
-            // Verify that the store's contents were properly restored ...
-            assertEquals(0, driver.checkForRestoredEntries(store));
-
-            // and there are no other entries ...
-            assertEquals(4, driver.sizeOf(store));
-        } finally {
-            store.close();
-        }
+        store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        // Verify that the store's contents were properly restored ...
+        assertEquals(0, driver.checkForRestoredEntries(store));
+
+        // and there are no other entries ...
+        assertEquals(4, driver.sizeOf(store));
     }
 
     @Test
     public void testRestoreWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
+        store.close();
         // Add any entries that will be restored to any store
         // that uses the driver's context ...
         driver.addEntryToRestoreLog(0, "zero");
@@ -180,70 +177,51 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
-        try {
-            // Verify that the store's contents were properly restored ...
-            assertEquals(0, driver.checkForRestoredEntries(store));
-
-            // and there are no other entries ...
-            assertEquals(4, driver.sizeOf(store));
-        } finally {
-            store.close();
-        }
+        store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        // Verify that the store's contents were properly restored ...
+        assertEquals(0, driver.checkForRestoredEntries(store));
+
+        // and there are no other entries ...
+        assertEquals(4, driver.sizeOf(store));
     }
 
     @Test
     public void testPutIfAbsent() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
-        try {
-
-            // Verify that the store reads and writes correctly ...
-            assertNull(store.putIfAbsent(0, "zero"));
-            assertNull(store.putIfAbsent(1, "one"));
-            assertNull(store.putIfAbsent(2, "two"));
-            assertNull(store.putIfAbsent(4, "four"));
-            assertEquals("four", store.putIfAbsent(4, "unexpected value"));
-            assertEquals(4, driver.sizeOf(store));
-            assertEquals("zero", store.get(0));
-            assertEquals("one", store.get(1));
-            assertEquals("two", store.get(2));
-            assertNull(store.get(3));
-            assertEquals("four", store.get(4));
-
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
-            assertEquals("zero", driver.flushedEntryStored(0));
-            assertEquals("one", driver.flushedEntryStored(1));
-            assertEquals("two", driver.flushedEntryStored(2));
-            assertEquals("four", driver.flushedEntryStored(4));
-
-            assertEquals(false, driver.flushedEntryRemoved(0));
-            assertEquals(false, driver.flushedEntryRemoved(1));
-            assertEquals(false, driver.flushedEntryRemoved(2));
-            assertEquals(false, driver.flushedEntryRemoved(4));
-        } finally {
-            store.close();
-        }
+        // Verify that the store reads and writes correctly ...
+        assertNull(store.putIfAbsent(0, "zero"));
+        assertNull(store.putIfAbsent(1, "one"));
+        assertNull(store.putIfAbsent(2, "two"));
+        assertNull(store.putIfAbsent(4, "four"));
+        assertEquals("four", store.putIfAbsent(4, "unexpected value"));
+        assertEquals(4, driver.sizeOf(store));
+        assertEquals("zero", store.get(0));
+        assertEquals("one", store.get(1));
+        assertEquals("two", store.get(2));
+        assertNull(store.get(3));
+        assertEquals("four", store.get(4));
+
+        // Flush the store and verify all current entries were properly flushed ...
+        store.flush();
+        assertEquals("zero", driver.flushedEntryStored(0));
+        assertEquals("one", driver.flushedEntryStored(1));
+        assertEquals("two", driver.flushedEntryStored(2));
+        assertEquals("four", driver.flushedEntryStored(4));
+
+        assertEquals(false, driver.flushedEntryRemoved(0));
+        assertEquals(false, driver.flushedEntryRemoved(1));
+        assertEquals(false, driver.flushedEntryRemoved(2));
+        assertEquals(false, driver.flushedEntryRemoved(4));
     }
 
     @Test
     public void testSize() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
-        try {
-            assertEquals("A newly created store should have no entries", 0, store.approximateNumEntries());
-
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(4, "four");
-            store.put(5, "five");
-            assertEquals(5, store.approximateNumEntries());
-        } finally {
-            store.close();
-        }
+        assertEquals("A newly created store should have no entries", 0, store.approximateNumEntries());
+
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(4, "four");
+        store.put(5, "five");
+        assertEquals(5, store.approximateNumEntries());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 82071b7..fd9ea96 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
@@ -51,54 +50,47 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
     @Test
     public void testEvict() {
         // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(3, "three");
+        store.put(4, "four");
+        store.put(5, "five");
+        store.put(6, "six");
+        store.put(7, "seven");
+        store.put(8, "eight");
+        store.put(9, "nine");
+        assertEquals(10, driver.sizeOf(store));
 
-        try {
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(3, "three");
-            store.put(4, "four");
-            store.put(5, "five");
-            store.put(6, "six");
-            store.put(7, "seven");
-            store.put(8, "eight");
-            store.put(9, "nine");
-            assertEquals(10, driver.sizeOf(store));
+        store.put(10, "ten");
+        store.flush();
+        assertEquals(10, driver.sizeOf(store));
+        assertTrue(driver.flushedEntryRemoved(0));
+        assertEquals(1, driver.numFlushedEntryRemoved());
 
-            store.put(10, "ten");
-            store.flush();
-            assertEquals(10, driver.sizeOf(store));
-            assertTrue(driver.flushedEntryRemoved(0));
-            assertEquals(1, driver.numFlushedEntryRemoved());
+        store.delete(1);
+        store.flush();
+        assertEquals(9, driver.sizeOf(store));
+        assertTrue(driver.flushedEntryRemoved(0));
+        assertTrue(driver.flushedEntryRemoved(1));
+        assertEquals(2, driver.numFlushedEntryRemoved());
 
-            store.delete(1);
-            store.flush();
-            assertEquals(9, driver.sizeOf(store));
-            assertTrue(driver.flushedEntryRemoved(0));
-            assertTrue(driver.flushedEntryRemoved(1));
-            assertEquals(2, driver.numFlushedEntryRemoved());
+        store.put(11, "eleven");
+        store.flush();
+        assertEquals(10, driver.sizeOf(store));
+        assertEquals(2, driver.numFlushedEntryRemoved());
 
-            store.put(11, "eleven");
-            store.flush();
-            assertEquals(10, driver.sizeOf(store));
-            assertEquals(2, driver.numFlushedEntryRemoved());
+        store.put(2, "two-again");
+        store.flush();
+        assertEquals(10, driver.sizeOf(store));
+        assertEquals(2, driver.numFlushedEntryRemoved());
 
-            store.put(2, "two-again");
-            store.flush();
-            assertEquals(10, driver.sizeOf(store));
-            assertEquals(2, driver.numFlushedEntryRemoved());
-
-            store.put(12, "twelve");
-            store.flush();
-            assertEquals(10, driver.sizeOf(store));
-            assertTrue(driver.flushedEntryRemoved(0));
-            assertTrue(driver.flushedEntryRemoved(1));
-            assertTrue(driver.flushedEntryRemoved(3));
-            assertEquals(3, driver.numFlushedEntryRemoved());
-        } finally {
-            store.close();
-        }
+        store.put(12, "twelve");
+        store.flush();
+        assertEquals(10, driver.sizeOf(store));
+        assertTrue(driver.flushedEntryRemoved(0));
+        assertTrue(driver.flushedEntryRemoved(1));
+        assertTrue(driver.flushedEntryRemoved(3));
+        assertEquals(3, driver.numFlushedEntryRemoved());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 84c0320..521fa32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -40,6 +40,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -64,7 +65,14 @@ public class RocksDBWindowStoreTest {
 
     @SuppressWarnings("unchecked")
     protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context) {
-        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, intSerde, stringSerde);
+        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName,
+                                                                       retentionPeriod,
+                                                                       numSegments,
+                                                                       true,
+                                                                       intSerde,
+                                                                       stringSerde,
+                                                                       true,
+                                                                       Collections.<String, String>emptyMap());
 
         WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context, store);

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
index ec5d841..39b127f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -29,12 +29,18 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
 
+import java.util.Collections;
+
 @SuppressWarnings("unchecked")
 public class StateStoreTestUtils {
 
     public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, Class<K> keyType, Class<V> valueType) {
         final InMemoryKeyValueStoreSupplier<K, V> supplier = new InMemoryKeyValueStoreSupplier<>(name,
-                null, null, new MockTime());
+                                                                                                 null,
+                                                                                                 null,
+                                                                                                 new MockTime(),
+                                                                                                 false,
+                                                                                                 Collections.<String, String>emptyMap());
 
         final StateStore stateStore = supplier.get();
         stateStore.init(new MockProcessorContext(StateSerdes.withBuiltinTypes(name, keyType, valueType),

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 19cd8e9..7675f9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -25,7 +25,6 @@ import java.util.Map;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -41,7 +40,7 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> logged = new HashMap<>();
     private final Map<Integer, String> written = new HashMap<>();
 
-    private final ProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
+    private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
             new RecordCollector(null, "StoreChangeLoggerTest") {
                 @SuppressWarnings("unchecked")
                 @Override
@@ -69,6 +68,7 @@ public class StoreChangeLoggerTest {
 
     @Test
     public void testAddRemove() {
+        context.setTime(1);
         written.put(0, "zero");
         changeLogger.add(0);
         written.put(1, "one");

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index f24dfda..3532623 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -25,6 +25,8 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
 
 public class MockStateStoreSupplier implements StateStoreSupplier {
     private final String name;
@@ -55,6 +57,16 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
         }
     }
 
+    @Override
+    public Map<String, String> logConfig() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public boolean loggingEnabled() {
+        return loggingEnabled;
+    }
+
     public static class MockStateStore implements StateStore {
         private final String name;
         private final boolean persistent;


Mime
View raw message