kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-6150: KIP-204 part III; Change repartition topic segment size and ms
Date Wed, 20 Dec 2017 00:05:48 GMT
KAFKA-6150: KIP-204 part III; Change repartition topic segment size and ms

1. Create default internal topic configs in StreamsConfig, especially for repartition topics change the segment size and time to smaller value.
2. Consolidate the default internal topic settings to InternalTopicManager and simplify InternalTopicConfig correspondingly.
3. Add an integration test for purging data.
4. MINOR: change TopologyBuilderException to IllegalStateException in StreamPartitionAssignor (part of https://issues.apache.org/jira/browse/KAFKA-5660).

Here are a few public facing APIs that get added:

1. AbstractConfig#originalsWithPrefix(String prefix, boolean strip): this for simplify the logic of passing admin and topic prefixed configs to consumer properties.
2. KafkaStreams constructor with Time object for convienent mocking in tests.

Will update KIP-204 accordingly if people re-votes these changes.

Author: Guozhang Wang <wangguoz@gmail.com>
Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #4315 from guozhangwang/K6150-segment-size


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/82c6d429
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/82c6d429
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/82c6d429

Branch: refs/heads/trunk
Commit: 82c6d429e70f1c782103953072dc6dbec650dd6e
Parents: f3b9afe
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue Dec 19 16:05:42 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Dec 19 16:05:42 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/clients/admin/ConfigEntry.java |  28 +++
 .../apache/kafka/clients/admin/NewTopic.java    |   7 +
 .../kafka/common/config/AbstractConfig.java     |  19 +-
 .../requests/DescribeLogDirsResponse.java       |  11 +
 .../kafka/clients/admin/MockAdminClient.java    |  21 +-
 core/src/main/scala/kafka/log/Log.scala         |   4 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |  23 +-
 .../org/apache/kafka/streams/StreamsConfig.java |  24 +-
 .../internals/InternalTopicConfig.java          |  84 +------
 .../internals/InternalTopicManager.java         |  23 +-
 .../internals/InternalTopologyBuilder.java      |  17 +-
 .../internals/RepartitionTopicConfig.java       |  85 +++++++
 .../internals/StreamPartitionAssignor.java      |   2 +-
 .../UnwindowedChangelogTopicConfig.java         |  81 +++++++
 .../internals/WindowedChangelogTopicConfig.java |  95 ++++++++
 .../apache/kafka/streams/StreamsConfigTest.java |   7 +-
 .../PurgeRepartitionTopicIntegrationTest.java   | 222 +++++++++++++++++++
 .../streams/processor/TopologyBuilderTest.java  |  54 ++---
 .../CopartitionedTopicsValidatorTest.java       |   5 +-
 .../internals/InternalTopicConfigTest.java      |  90 +-------
 .../internals/InternalTopicManagerTest.java     |  47 +++-
 .../internals/InternalTopologyBuilderTest.java  |  58 ++---
 .../kafka/test/MockInternalTopicManager.java    |   2 +-
 23 files changed, 736 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
index 90ba5d4..3f24c81 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
@@ -100,6 +100,34 @@ public class ConfigEntry {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ConfigEntry that = (ConfigEntry) o;
+
+        return this.name.equals(that.name) &&
+                this.value != null ? this.value.equals(that.value) : that.value == null &&
+                this.isDefault == that.isDefault &&
+                this.isSensitive == that.isSensitive &&
+                this.isReadOnly == that.isReadOnly;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + name.hashCode();
+        result = prime * result + ((value == null) ? 0 : value.hashCode());
+        result = prime * result + (isDefault ? 1 : 0);
+        result = prime * result + (isSensitive ? 1 : 0);
+        result = prime * result + (isReadOnly ? 1 : 0);
+        return result;
+    }
+
+    @Override
     public String toString() {
         return "ConfigEntry(" +
                 "name=" + name +

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
index ff09579..c4bc218 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
@@ -98,6 +98,13 @@ public class NewTopic {
         return this;
     }
 
+    /**
+     * The configuration for the new topic or null if no configs ever specified.
+     */
+    public Map<String, String> configs() {
+        return configs;
+    }
+
     TopicDetails convertToTopicDetails() {
         if (replicasAssignments != null) {
             if (configs != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index dc80d98..61a5798 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -176,10 +176,25 @@ public class AbstractConfig {
      * @return a Map containing the settings with the prefix
      */
     public Map<String, Object> originalsWithPrefix(String prefix) {
+        return originalsWithPrefix(prefix, true);
+    }
+
+    /**
+     * Gets all original settings with the given prefix.
+     *
+     * @param prefix the prefix to use as a filter
+     * @param strip strip the prefix before adding to the output if set true
+     * @return a Map containing the settings with the prefix
+     */
+    public Map<String, Object> originalsWithPrefix(String prefix, boolean strip) {
         Map<String, Object> result = new RecordingMap<>(prefix, false);
         for (Map.Entry<String, ?> entry : originals.entrySet()) {
-            if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length())
-                result.put(entry.getKey().substring(prefix.length()), entry.getValue());
+            if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
+                if (strip)
+                    result.put(entry.getKey().substring(prefix.length()), entry.getValue());
+                else
+                    result.put(entry.getKey(), entry.getValue());
+            }
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index a242240..9b68a9e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -192,6 +192,17 @@ public class DescribeLogDirsResponse extends AbstractResponse {
             this.error = error;
             this.replicaInfos = replicaInfos;
         }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("(error=")
+                    .append(error)
+                    .append(", replicas=")
+                    .append(replicaInfos)
+                    .append(")");
+            return builder.toString();
+        }
     }
 
     static public class ReplicaInfo {

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 11fe428..d00bad0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -121,7 +121,7 @@ public class MockAdminClient extends AdminClient {
             for (int p = 0; p < numberOfPartitions; ++p) {
                 partitions.add(new TopicPartitionInfo(p, brokers.get(0), replicas, Collections.<Node>emptyList()));
             }
-            allTopics.put(topicName, new TopicMetadata(false, partitions, newTopic.convertToTopicDetails().configs));
+            allTopics.put(topicName, new TopicMetadata(false, partitions, newTopic.configs()));
             future.complete(null);
             createTopicResult.put(topicName, future);
         }
@@ -220,7 +220,24 @@ public class MockAdminClient extends AdminClient {
 
     @Override
     public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        Map<ConfigResource, KafkaFuture<Config>> configescriptions = new HashMap<>();
+
+        for (ConfigResource resource : resources) {
+            if (resource.type() == ConfigResource.Type.TOPIC) {
+                Map<String, String> configs = allTopics.get(resource.name()).configs;
+                List<ConfigEntry> configEntries = new ArrayList<>();
+                for (Map.Entry<String, String> entry : configs.entrySet()) {
+                    configEntries.add(new ConfigEntry(entry.getKey(), entry.getValue()));
+                }
+                KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+                future.complete(new Config(configEntries));
+                configescriptions.put(resource, future);
+            } else {
+                throw new UnsupportedOperationException("Not implemented yet");
+            }
+        }
+
+        return new DescribeConfigsResult(configescriptions);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a46fffe..44d255b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1226,12 +1226,14 @@ class Log(@volatile var dir: File,
         false
       }
     }
+
     deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
       nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
+
     deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
   }
 
@@ -1566,7 +1568,7 @@ class Log(@volatile var dir: File,
    * @param segment The log segment to schedule for deletion
    */
   private def deleteSegment(segment: LogSegment) {
-    info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))
+    info(s"Scheduling log segment [baseOffset ${segment.baseOffset}, size ${segment.size}] for log $name for deletion.")
     lock synchronized {
       segments.remove(segment.baseOffset)
       asyncDeleteSegment(segment)

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d10d786..c7ae613 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -590,11 +590,32 @@ public class KafkaStreams {
         this(topology.internalTopologyBuilder, config, clientSupplier);
     }
 
+    /**
+     * Create a {@code KafkaStreams} instance.
+     *
+     * @param topology       the topology specifying the computational logic
+     * @param config         the Kafka Streams configuration
+     * @param time           {@code Time} implementation; cannot be null
+     * @throws StreamsException if any fatal error occurs
+     */
+    public KafkaStreams(final Topology topology,
+                        final StreamsConfig config,
+                        final Time time) {
+        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), time);
+    }
+
     private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                          final StreamsConfig config,
                          final KafkaClientSupplier clientSupplier) throws StreamsException {
+        this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
+    }
+
+    private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
+                         final StreamsConfig config,
+                         final KafkaClientSupplier clientSupplier,
+                         final Time time) throws StreamsException {
         this.config = config;
-        time = Time.SYSTEM;
+        this.time = time;
 
         // The application ID is a required config and hence should always have value
         processId = UUID.randomUUID();

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 21c9759..b4908e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -121,6 +122,8 @@ public class StreamsConfig extends AbstractConfig {
      * These should be valid properties from {@link org.apache.kafka.common.config.TopicConfig TopicConfig}.
      * It is recommended to use {@link #topicPrefix(String)}.
      */
+    // TODO: currently we cannot get the full topic configurations and hence cannot allow topic configs without the prefix,
+    //       this can be lifted once kafka.log.LogConfig is completely deprecated by org.apache.kafka.common.config.TopicConfig
     public static final String TOPIC_PREFIX = "topic.";
 
     /**
@@ -756,13 +759,32 @@ public class StreamsConfig extends AbstractConfig {
 
         // add configs required for stream partition assignor
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
+        consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
         consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
         consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
-        consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
+
+        // add admin retries configs for creating topics
         final AdminClientConfig config = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
         consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), config.getInt(AdminClientConfig.RETRIES_CONFIG));
 
+        // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
+        final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
+
+        if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) {
+            final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
+            final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
+            final int batchSize = producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG) ? Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString()) : 16384;
+
+            if (segmentSize < batchSize) {
+                throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic",
+                        segmentSize,
+                        batchSize));
+            }
+        }
+
+        consumerProps.putAll(topicProps);
+
         return consumerProps;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index c6758d3..888366b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -20,44 +20,27 @@ import org.apache.kafka.common.internals.Topic;
 
 import java.util.Map;
 import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
 
 /**
  * InternalTopicConfig captures the properties required for configuring
  * the internal topics we create for change-logs and repartitioning etc.
  */
-public class InternalTopicConfig {
-    public enum CleanupPolicy { compact, delete }
+public abstract class InternalTopicConfig {
 
-    private final String name;
-    private int numberOfPartitions = -1;
-    private final Map<String, String> logConfig;
-    private final Set<CleanupPolicy> cleanupPolicies;
+    // we need to distinguish windowed and un-windowed store changelog since their cleanup policy may be different
+    public enum InternalTopicType { REPARTITION, WINDOWED_STORE_CHANGELOG, UNWINDOWED_STORE_CHANGELOG }
+
+    final String name;
+    final Map<String, String> topicConfigs;
 
-    private Long retentionMs;
+    private int numberOfPartitions = -1;
 
-    public InternalTopicConfig(final String name,
-                               final Set<CleanupPolicy> defaultCleanupPolicies,
-                               final Map<String, String> logConfig) {
+    InternalTopicConfig(final String name, final Map<String, String> topicConfigs) {
         Objects.requireNonNull(name, "name can't be null");
         Topic.validate(name);
 
-        if (defaultCleanupPolicies.isEmpty()) {
-            throw new IllegalArgumentException("Must provide at least one cleanup policy.");
-        }
         this.name = name;
-        this.cleanupPolicies = defaultCleanupPolicies;
-        this.logConfig = logConfig;
-    }
-
-    /* for test use only */
-    boolean isCompacted() {
-        return cleanupPolicies.contains(CleanupPolicy.compact);
-    }
-
-    private boolean isCompactDelete() {
-        return cleanupPolicies.contains(CleanupPolicy.compact) && cleanupPolicies.contains(CleanupPolicy.delete);
+        this.topicConfigs = topicConfigs;
     }
 
     /**
@@ -67,28 +50,7 @@ public class InternalTopicConfig {
      * @param additionalRetentionMs - added to retention to allow for clock drift etc
      * @return Properties to be used when creating the topic
      */
-    public Properties toProperties(final long additionalRetentionMs) {
-        final Properties result = new Properties();
-        for (Map.Entry<String, String> configEntry : logConfig.entrySet()) {
-            result.put(configEntry.getKey(), configEntry.getValue());
-        }
-        if (retentionMs != null && isCompactDelete()) {
-            result.put(InternalTopicManager.RETENTION_MS, String.valueOf(retentionMs + additionalRetentionMs));
-        }
-
-        if (!logConfig.containsKey(InternalTopicManager.CLEANUP_POLICY_PROP)) {
-            final StringBuilder builder = new StringBuilder();
-            for (CleanupPolicy cleanupPolicy : cleanupPolicies) {
-                builder.append(cleanupPolicy.name()).append(",");
-            }
-            builder.deleteCharAt(builder.length() - 1);
-
-            result.put(InternalTopicManager.CLEANUP_POLICY_PROP, builder.toString());
-        }
-
-
-        return result;
-    }
+    abstract public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs);
 
     public String name() {
         return name;
@@ -108,35 +70,11 @@ public class InternalTopicConfig {
         this.numberOfPartitions = numberOfPartitions;
     }
 
-    void setRetentionMs(final long retentionMs) {
-        if (!logConfig.containsKey(InternalTopicManager.RETENTION_MS)) {
-            this.retentionMs = retentionMs;
-        }
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        final InternalTopicConfig that = (InternalTopicConfig) o;
-        return Objects.equals(name, that.name) &&
-                Objects.equals(logConfig, that.logConfig) &&
-                Objects.equals(retentionMs, that.retentionMs) &&
-                Objects.equals(cleanupPolicies, that.cleanupPolicies);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(name, logConfig, retentionMs, cleanupPolicies);
-    }
-
     @Override
     public String toString() {
         return "InternalTopicConfig(" +
                 "name=" + name +
-                ", logConfig=" + logConfig +
-                ", cleanupPolicies=" + cleanupPolicies +
-                ", retentionMs=" + retentionMs +
+                ", topicConfigs=" + topicConfigs +
                 ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index cae3128..f093d83 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -37,18 +37,10 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 public class InternalTopicManager {
-
-    private static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
-
-    public final static String CLEANUP_POLICY_PROP = "cleanup.policy";
-    public final static String RETENTION_MS = "retention.ms";
-
     private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. " +
         "Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
 
@@ -62,9 +54,8 @@ public class InternalTopicManager {
     private final int retries;
 
     public InternalTopicManager(final AdminClient adminClient,
-                                final Map<String, ?> config) {
+                                final StreamsConfig streamsConfig) {
         this.adminClient = adminClient;
-        final StreamsConfig streamsConfig = new StreamsConfig(config);
 
         LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
         log = logContext.logger(getClass());
@@ -102,11 +93,13 @@ public class InternalTopicManager {
             final Set<NewTopic> newTopics = new HashSet<>();
 
             for (final InternalTopicConfig internalTopicConfig : topicsToBeCreated) {
-                final Properties topicProperties = internalTopicConfig.toProperties(windowChangeLogAdditionalRetention);
-                final Map<String, String> topicConfig = new HashMap<>(defaultTopicConfigs);
-                for (final String key : topicProperties.stringPropertyNames()) {
-                    topicConfig.put(key, topicProperties.getProperty(key));
-                }
+                final Map<String, String> topicConfig = internalTopicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention);
+
+                log.debug("Going to create topic {} with {} partitions and config {}.",
+                        internalTopicConfig.name(),
+                        internalTopicConfig.numberOfPartitions(),
+                        topicConfig);
+
                 newTopics.add(
                     new NewTopic(
                         internalTopicConfig.name(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 90d46aa..e71959b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1027,7 +1027,7 @@ public class InternalTopologyBuilder {
         for (final Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
             final Set<String> sinkTopics = new HashSet<>();
             final Set<String> sourceTopics = new HashSet<>();
-            final Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
+            final Map<String, InternalTopicConfig> repartitionTopics = new HashMap<>();
             final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
             for (final String node : entry.getValue()) {
                 // if the node is a source node, add to the source topics
@@ -1042,9 +1042,7 @@ public class InternalTopologyBuilder {
                         if (internalTopicNames.contains(topic)) {
                             // prefix the internal topic name with the application id
                             final String internalTopic = decorateTopic(topic);
-                            internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic,
-                                                                                            Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
-                                                                                            Collections.<String, String>emptyMap()));
+                            repartitionTopics.put(internalTopic, new RepartitionTopicConfig(internalTopic, Collections.<String, String>emptyMap()));
                             sourceTopics.add(internalTopic);
                         } else {
                             sourceTopics.add(topic);
@@ -1076,7 +1074,7 @@ public class InternalTopologyBuilder {
                 topicGroups.put(entry.getKey(), new TopicsInfo(
                         Collections.unmodifiableSet(sinkTopics),
                         Collections.unmodifiableSet(sourceTopics),
-                        Collections.unmodifiableMap(internalSourceTopics),
+                        Collections.unmodifiableMap(repartitionTopics),
                         Collections.unmodifiableMap(stateChangelogTopics)));
             }
         }
@@ -1120,14 +1118,9 @@ public class InternalTopologyBuilder {
     private InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory factory,
                                                            final String name) {
         if (!factory.isWindowStore()) {
-            return new InternalTopicConfig(name,
-                                           Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                           factory.logConfig());
+            return new UnwindowedChangelogTopicConfig(name, factory.logConfig());
         } else {
-            final InternalTopicConfig config = new InternalTopicConfig(name,
-                    Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
-                                InternalTopicConfig.CleanupPolicy.delete),
-                    factory.logConfig());
+            final WindowedChangelogTopicConfig config = new WindowedChangelogTopicConfig(name, factory.logConfig());
             config.setRetentionMs(factory.retentionPeriod());
             return config;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
new file mode 100644
index 0000000..1459310
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * RepartitionTopicConfig captures the properties required for configuring
+ * the repartition topics.
+ */
+public class RepartitionTopicConfig extends InternalTopicConfig {
+
+    private static final Map<String, String> REPARTITION_TOPIC_DEFAULT_OVERRIDES;
+    static {
+        final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
+        tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
+        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "52428800");     // 50 MB
+        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800");           // 50 MB
+        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, "600000");                // 10 min
+        REPARTITION_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
+    }
+
+    public RepartitionTopicConfig(final String name, final Map<String, String> topicConfigs) {
+        super(name, topicConfigs);
+    }
+
+    /**
+     * Get the configured properties for this topic. If rententionMs is set then
+     * we add additionalRetentionMs to work out the desired retention when cleanup.policy=compact,delete
+     *
+     * @param additionalRetentionMs - added to retention to allow for clock drift etc
+     * @return Properties to be used when creating the topic
+     */
+    public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs) {
+        // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
+        final Map<String, String> topicConfig = new HashMap<>(REPARTITION_TOPIC_DEFAULT_OVERRIDES);
+
+        topicConfig.putAll(defaultProperties);
+
+        topicConfig.putAll(topicConfigs);
+
+        return topicConfig;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final RepartitionTopicConfig that = (RepartitionTopicConfig) o;
+        return Objects.equals(name, that.name) &&
+               Objects.equals(topicConfigs, that.topicConfigs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, topicConfigs);
+    }
+
+    @Override
+    public String toString() {
+        return "RepartitionTopicConfig(" +
+                "name=" + name +
+                ", topicConfigs=" + topicConfigs +
+                ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 57c69c8..d5a1fe6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -228,7 +228,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             this.userEndPoint = userEndPoint;
         }
 
-        internalTopicManager = new InternalTopicManager(taskManager.adminClient, configs);
+        internalTopicManager = new InternalTopicManager(taskManager.adminClient, streamsConfig);
 
         copartitionedTopicsValidator = new CopartitionedTopicsValidator(logPrefix);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
new file mode 100644
index 0000000..4606a56
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * UnwindowedChangelogTopicConfig captures the properties required for configuring
+ * the un-windowed store changelog topics.
+ */
+public class UnwindowedChangelogTopicConfig extends InternalTopicConfig {
+    private static final Map<String, String> UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES;
+    static {
+        final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
+        tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
+        UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
+    }
+
+    public UnwindowedChangelogTopicConfig(final String name, final Map<String, String> topicConfigs) {
+        super(name, topicConfigs);
+    }
+
+    /**
+     * Get the configured properties for this topic. If rententionMs is set then
+     * we add additionalRetentionMs to work out the desired retention when cleanup.policy=compact,delete
+     *
+     * @param additionalRetentionMs - added to retention to allow for clock drift etc
+     * @return Properties to be used when creating the topic
+     */
+    public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs) {
+        // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
+        final Map<String, String> topicConfig = new HashMap<>(UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES);
+
+        topicConfig.putAll(defaultProperties);
+
+        topicConfig.putAll(topicConfigs);
+
+        return topicConfig;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final UnwindowedChangelogTopicConfig that = (UnwindowedChangelogTopicConfig) o;
+        return Objects.equals(name, that.name) &&
+               Objects.equals(topicConfigs, that.topicConfigs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, topicConfigs);
+    }
+
+    @Override
+    public String toString() {
+        return "UnwindowedChangelogTopicConfig(" +
+                "name=" + name +
+                ", topicConfigs=" + topicConfigs +
+                ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
new file mode 100644
index 0000000..d42642a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * WindowedChangelogTopicConfig captures the properties required for configuring
+ * the windowed store changelog topics.
+ */
+public class WindowedChangelogTopicConfig extends InternalTopicConfig {
+    private static final Map<String, String> WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES;
+    static {
+        final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
+        tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE);
+        WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
+    }
+
+    private Long retentionMs;
+
+    public WindowedChangelogTopicConfig(final String name, final Map<String, String> topicConfigs) {
+        super(name, topicConfigs);
+    }
+
+    /**
+     * Get the configured properties for this topic. If rententionMs is set then
+     * we add additionalRetentionMs to work out the desired retention when cleanup.policy=compact,delete
+     *
+     * @param additionalRetentionMs - added to retention to allow for clock drift etc
+     * @return Properties to be used when creating the topic
+     */
+    public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs) {
+        // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
+        final Map<String, String> topicConfig = new HashMap<>(WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES);
+
+        topicConfig.putAll(defaultProperties);
+
+        topicConfig.putAll(topicConfigs);
+
+        if (retentionMs != null) {
+            topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs + additionalRetentionMs));
+        }
+
+        return topicConfig;
+    }
+
+    void setRetentionMs(final long retentionMs) {
+        if (!topicConfigs.containsKey(TopicConfig.RETENTION_MS_CONFIG)) {
+            this.retentionMs = retentionMs;
+        }
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final WindowedChangelogTopicConfig that = (WindowedChangelogTopicConfig) o;
+        return Objects.equals(name, that.name) &&
+                Objects.equals(topicConfigs, that.topicConfigs) &&
+                Objects.equals(retentionMs, that.retentionMs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, topicConfigs, retentionMs);
+    }
+
+    @Override
+    public String toString() {
+        return "WindowedChangelogTopicConfig(" +
+                "name=" + name +
+                ", topicConfigs=" + topicConfigs +
+                ", retentionMs=" + retentionMs +
+                ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 12db711..1d6b5a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -110,6 +111,8 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 7L);
         props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host");
         props.put(StreamsConfig.RETRIES_CONFIG, 10);
+        props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 5);
+        props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
         final String groupId = "example-application";
@@ -121,7 +124,9 @@ public class StreamsConfigTest {
         assertEquals(StreamPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
         assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
         assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
-        assertEquals(10, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
+        assertEquals(null, returnedProps.get(StreamsConfig.RETRIES_CONFIG));
+        assertEquals(5, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
+        assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
new file mode 100644
index 0000000..18ffb87
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+@Category({IntegrationTest.class})
+public class PurgeRepartitionTopicIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+
+    private static final String INPUT_TOPIC = "input-stream";
+    private static final String APPLICATION_ID = "restore-test";
+    private static final String REPARTITION_TOPIC = APPLICATION_ID + "-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition";
+
+    private static AdminClient adminClient;
+    private static KafkaStreams kafkaStreams;
+    private static Integer purgeIntervalMs = 10;
+    private static Integer purgeSegmentBytes = 2000;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, new Properties() {
+        {
+            put("log.retention.check.interval.ms", purgeIntervalMs);
+            put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 0);
+        }
+    });
+
+    private Time time = CLUSTER.time;
+
+    private class RepartitionTopicCreatedWithExpectedConfigs implements TestCondition {
+        @Override
+        final public boolean conditionMet() {
+            try {
+                Set<String> topics = adminClient.listTopics().names().get();
+
+                if (!topics.contains(REPARTITION_TOPIC)) {
+                    return false;
+                }
+            } catch (final Exception e) {
+                return false;
+            }
+
+            try {
+                ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, REPARTITION_TOPIC);
+                Config config = adminClient.describeConfigs(Collections.singleton(resource))
+                        .values().get(resource).get();
+                return config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value().equals(TopicConfig.CLEANUP_POLICY_DELETE)
+                        && config.get(TopicConfig.SEGMENT_MS_CONFIG).value().equals(purgeIntervalMs.toString())
+                        && config.get(TopicConfig.SEGMENT_BYTES_CONFIG).value().equals(purgeSegmentBytes.toString());
+            } catch (final Exception e) {
+                return false;
+            }
+        }
+    }
+
+    private interface TopicSizeVerifier {
+
+        boolean verify(long currentSize);
+    }
+
+    private class RepartitionTopicVerified implements TestCondition {
+        private final TopicSizeVerifier verifier;
+
+        RepartitionTopicVerified(TopicSizeVerifier verifier) {
+            this.verifier = verifier;
+        }
+
+        @Override
+        public final boolean conditionMet() {
+            time.sleep(purgeIntervalMs);
+
+            try {
+                final Collection<DescribeLogDirsResponse.LogDirInfo> logDirInfo = adminClient.describeLogDirs(Collections.singleton(0)).values().get(0).get().values();
+
+                for (final DescribeLogDirsResponse.LogDirInfo partitionInfo : logDirInfo) {
+                    final DescribeLogDirsResponse.ReplicaInfo replicaInfo = partitionInfo.replicaInfos.get(new TopicPartition(REPARTITION_TOPIC, 0));
+                    if (replicaInfo != null && verifier.verify(replicaInfo.size)) {
+                        return true;
+                    }
+                }
+
+            } catch (final Exception e) {
+                // swallow
+            }
+
+            return false;
+        }
+    }
+
+    @BeforeClass
+    public static void createTopics() throws InterruptedException {
+        CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
+    }
+
+    @Before
+    public void setup() {
+        // create admin client for verification
+        Properties adminConfig = new Properties();
+        adminConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        adminClient = AdminClient.create(adminConfig);
+
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(APPLICATION_ID).getPath());
+        streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), purgeIntervalMs);
+        streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), purgeSegmentBytes);
+        streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), purgeSegmentBytes / 2);    // we cannot allow batch size larger than segment size
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, purgeIntervalMs);
+
+        StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream(INPUT_TOPIC)
+               .groupBy(MockKeyValueMapper.SelectKeyKeyValueMapper())
+               .count();
+
+        kafkaStreams = new KafkaStreams(builder.build(), new StreamsConfig(streamsConfiguration), time);
+    }
+
+    @After
+    public void shutdown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(30, TimeUnit.SECONDS);
+        }
+    }
+
+
+    @Test
+    public void shouldRestoreState() throws InterruptedException, ExecutionException {
+        // produce some data to input topic
+        final List<KeyValue<Integer, Integer>> messages = new ArrayList<>();
+        for (int i = 0; i < 1000; i++) {
+            messages.add(new KeyValue<>(i, i));
+        }
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC,
+                messages,
+                TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                        IntegerSerializer.class,
+                        IntegerSerializer.class),
+                time.milliseconds());
+
+        kafkaStreams.start();
+
+        TestUtils.waitForCondition(new RepartitionTopicCreatedWithExpectedConfigs(), 60000,
+                "Repartition topic " + REPARTITION_TOPIC + " not created with the expected configs after 60000 ms.");
+
+        TestUtils.waitForCondition(
+                new RepartitionTopicVerified(new TopicSizeVerifier() {
+                    @Override
+                    public boolean verify(long currentSize) {
+                        return currentSize > 0;
+                    }
+                }),
+                60000,
+                "Repartition topic " + REPARTITION_TOPIC + " not received data after 60000 ms."
+        );
+
+        // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
+        TestUtils.waitForCondition(
+                new RepartitionTopicVerified(new TopicSizeVerifier() {
+                    @Override
+                    public boolean verify(long currentSize) {
+                        return currentSize <= purgeSegmentBytes;
+                    }
+                }),
+                60000,
+                "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index d21d2e3..ac3cd49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
@@ -23,11 +24,11 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
-import org.apache.kafka.streams.processor.internals.InternalTopicManager;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -38,7 +39,6 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -407,27 +407,13 @@ public class TopologyBuilderTest {
         final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
         expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
                                                   Collections.<String, InternalTopicConfig>emptyMap(),
-                                                  Collections.singletonMap(store1,
-                                                                           new InternalTopicConfig(
-                                                                                   store1,
-                                                                                   Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                                                                   Collections.<String, String>emptyMap()))));
+                                                  Collections.singletonMap(store1, (InternalTopicConfig)  new UnwindowedChangelogTopicConfig(store1, Collections.<String, String>emptyMap()))));
         expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
                                                   Collections.<String, InternalTopicConfig>emptyMap(),
-                                                  Collections.singletonMap(store2,
-                                                                           new InternalTopicConfig(
-                                                                                   store2,
-                                                                                   Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                                                                   Collections.<String, String>emptyMap()))));
+                                                  Collections.singletonMap(store2, (InternalTopicConfig)  new UnwindowedChangelogTopicConfig(store2, Collections.<String, String>emptyMap()))));
         expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"),
                                                   Collections.<String, InternalTopicConfig>emptyMap(),
-                                                  Collections.singletonMap(store3,
-                                                                           new InternalTopicConfig(
-                                                                                   store3,
-                                                                                   Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                                                                   Collections.<String, String>emptyMap()))));
-
-
+                                                  Collections.singletonMap(store3, (InternalTopicConfig)  new UnwindowedChangelogTopicConfig(store3, Collections.<String, String>emptyMap()))));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -561,7 +547,7 @@ public class TopologyBuilderTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() {
+    public void shouldAddInternalTopicConfigForWindowStores() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId("appId");
         builder.addSource("source", "topic");
@@ -570,44 +556,38 @@ public class TopologyBuilderTest {
         final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
         final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
-        final Properties properties = topicConfig.toProperties(0);
-        final List<String> policies = Arrays.asList(properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP).split(","));
-        assertEquals("appId-store-changelog", topicConfig.name());
-        assertTrue(policies.contains("compact"));
-        assertTrue(policies.contains("delete"));
-        assertEquals(2, policies.size());
-        assertEquals("30000", properties.getProperty(InternalTopicManager.RETENTION_MS));
+        final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
         assertEquals(2, properties.size());
+        assertEquals("40000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
+        assertEquals("appId-store-changelog", topicConfig.name());
     }
 
     @Test
-    public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() {
+    public void shouldAddInternalTopicConfigForNonWindowStores() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId("appId");
         builder.addSource("source", "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("name", true), "processor");
+        builder.addStateStore(new MockStateStoreSupplier("store", true), "processor");
         final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
         final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
-        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-name-changelog");
-        final Properties properties = topicConfig.toProperties(0);
-        assertEquals("appId-name-changelog", topicConfig.name());
-        assertEquals("compact", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
+        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
+        final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
         assertEquals(1, properties.size());
+        assertEquals("appId-store-changelog", topicConfig.name());
     }
 
     @Test
-    public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() {
+    public void shouldAddInternalTopicConfigForRepartitionTopics() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId("appId");
         builder.addInternalTopic("foo");
         builder.addSource("source", "foo");
         final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
-        final Properties properties = topicConfig.toProperties(0);
+        final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
+        assertEquals(4, properties.size());
         assertEquals("appId-foo", topicConfig.name());
-        assertEquals("delete", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
-        assertEquals(1, properties.size());
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index 6966d67..19277e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -122,10 +122,7 @@ public class CopartitionedTopicsValidatorTest {
     private StreamPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic,
                                                                               final int partitions) {
         final InternalTopicConfig repartitionTopicConfig
-                = new InternalTopicConfig(repartitionTopic,
-                                          Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                          Collections.<String, String>emptyMap());
-
+                = new RepartitionTopicConfig(repartitionTopic, Collections.<String, String>emptyMap());
 
         final StreamPartitionAssignor.InternalTopicMetadata metadata
                 = new StreamPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
index 581c8cb..eaae352 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
@@ -16,113 +16,45 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 public class InternalTopicConfigTest {
 
-    @Test
-    public void shouldHaveCompactionPropSetIfSupplied() {
-        final Properties properties = new InternalTopicConfig("name",
-                                                              Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                                              Collections.<String, String>emptyMap()).toProperties(0);
-        assertEquals("compact", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
-    }
-
-
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfNameIsNull() {
-        new InternalTopicConfig(null, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.<String, String>emptyMap());
+        new RepartitionTopicConfig(null, Collections.<String, String>emptyMap());
     }
 
     @Test(expected = InvalidTopicException.class)
     public void shouldThrowIfNameIsInvalid() {
-        new InternalTopicConfig("foo bar baz", Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.<String, String>emptyMap());
-    }
-
-    @Test
-    public void shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete() {
-        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
-                                                                        Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact, InternalTopicConfig.CleanupPolicy.delete),
-                                                                        Collections.<String, String>emptyMap());
-        final int additionalRetentionMs = 20;
-        topicConfig.setRetentionMs(10);
-        final Properties properties = topicConfig.toProperties(additionalRetentionMs);
-        assertEquals("30", properties.getProperty(InternalTopicManager.RETENTION_MS));
-    }
-
-    @Test
-    public void shouldNotConfigureRetentionMsWhenCompact() {
-        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
-                                                                        Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                                                        Collections.<String, String>emptyMap());
-        topicConfig.setRetentionMs(10);
-        final Properties properties = topicConfig.toProperties(0);
-        assertNull(null, properties.getProperty(InternalTopicManager.RETENTION_MS));
+        new RepartitionTopicConfig("foo bar baz", Collections.<String, String>emptyMap());
     }
 
     @Test
-    public void shouldNotConfigureRetentionMsWhenDelete() {
-        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
-                                                                        Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
-                                                                        Collections.<String, String>emptyMap());
+    public void shouldAugmentRetentionMsWithWindowedChangelog() {
+        final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.<String, String>emptyMap());
         topicConfig.setRetentionMs(10);
-        final Properties properties = topicConfig.toProperties(0);
-        assertNull(null, properties.getProperty(InternalTopicManager.RETENTION_MS));
-    }
-
-
-    @Test
-    public void shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete() {
-        assertTrue(new InternalTopicConfig("name",
-                                           Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                           Collections.<String, String>emptyMap()).isCompacted());
-        assertTrue(new InternalTopicConfig("name",
-                                           Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
-                                                       InternalTopicConfig.CleanupPolicy.delete),
-                                           Collections.<String, String>emptyMap()).isCompacted());
-    }
-
-    @Test
-    public void shouldNotBeCompactedWhenCleanupPolicyIsDelete() {
-        assertFalse(new InternalTopicConfig("name",
-                                            Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
-                                            Collections.<String, String>emptyMap()).isCompacted());
-    }
-
-    @Test
-    public void shouldUseCleanupPolicyFromConfigIfSupplied() {
-        final InternalTopicConfig config = new InternalTopicConfig("name",
-                                                                   Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
-                                                                   Collections.singletonMap("cleanup.policy", "compact"));
-
-        final Properties properties = config.toProperties(0);
-        assertEquals("compact", properties.getProperty("cleanup.policy"));
+        assertEquals("30", topicConfig.getProperties(Collections.<String, String>emptyMap(), 20).get(TopicConfig.RETENTION_MS_CONFIG));
     }
 
     @Test
-    public void shouldHavePropertiesSuppliedByUser() {
+    public void shouldUseSuppliedConfigs() {
         final Map<String, String> configs = new HashMap<>();
         configs.put("retention.ms", "1000");
         configs.put("retention.bytes", "10000");
 
-        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
-                                                                 Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
-                                                                 configs);
+        final UnwindowedChangelogTopicConfig topicConfig = new UnwindowedChangelogTopicConfig("name", configs);
 
-        final Properties properties = topicConfig.toProperties(0);
-        assertEquals("1000", properties.getProperty("retention.ms"));
-        assertEquals("10000", properties.getProperty("retention.bytes"));
+        final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 0);
+        assertEquals("1000", properties.get("retention.ms"));
+        assertEquals("10000", properties.get("retention.bytes"));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 10d3dd9..3210b23 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -16,11 +16,16 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.junit.After;
@@ -52,6 +57,7 @@ public class InternalTopicManagerTest {
     };
     private final String topic = "test_topic";
     private final String topic2 = "test_topic_2";
+    private final String topic3 = "test_topic_3";
     private final List<Node> singleReplica = Collections.singletonList(broker1);
 
     private MockAdminClient mockAdminClient;
@@ -63,6 +69,7 @@ public class InternalTopicManagerTest {
             put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker1.host() + ":" + broker1.port());
             put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
             put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 1);
+            put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 16384);
         }
     };
 
@@ -71,7 +78,7 @@ public class InternalTopicManagerTest {
         mockAdminClient = new MockAdminClient(cluster);
         internalTopicManager = new InternalTopicManager(
             mockAdminClient,
-            config);
+            new StreamsConfig(config));
     }
 
     @After
@@ -125,16 +132,42 @@ public class InternalTopicManagerTest {
 
     @Test
     public void shouldCreateRequiredTopics() throws Exception {
-        final InternalTopicConfig topicConfig = new InternalTopicConfig(topic,  Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.<String, String>emptyMap());
+        final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic, Collections.<String, String>emptyMap());
         topicConfig.setNumberOfPartitions(1);
+        final InternalTopicConfig topicConfig2 = new UnwindowedChangelogTopicConfig(topic2, Collections.<String, String>emptyMap());
+        topicConfig2.setNumberOfPartitions(1);
+        final InternalTopicConfig topicConfig3 = new WindowedChangelogTopicConfig(topic3, Collections.<String, String>emptyMap());
+        topicConfig3.setNumberOfPartitions(1);
+
         internalTopicManager.makeReady(Collections.singletonMap(topic, topicConfig));
+        internalTopicManager.makeReady(Collections.singletonMap(topic2, topicConfig2));
+        internalTopicManager.makeReady(Collections.singletonMap(topic3, topicConfig3));
 
-        assertEquals(Collections.singleton(topic), mockAdminClient.listTopics().names().get());
+        assertEquals(Utils.mkSet(topic, topic2, topic3), mockAdminClient.listTopics().names().get());
         assertEquals(new TopicDescription(topic, false, new ArrayList<TopicPartitionInfo>() {
             {
                 add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
             }
         }), mockAdminClient.describeTopics(Collections.singleton(topic)).values().get(topic).get());
+        assertEquals(new TopicDescription(topic2, false, new ArrayList<TopicPartitionInfo>() {
+            {
+                add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
+            }
+        }), mockAdminClient.describeTopics(Collections.singleton(topic2)).values().get(topic2).get());
+        assertEquals(new TopicDescription(topic3, false, new ArrayList<TopicPartitionInfo>() {
+            {
+                add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
+            }
+        }), mockAdminClient.describeTopics(Collections.singleton(topic3)).values().get(topic3).get());
+
+        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+        ConfigResource resource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2);
+        ConfigResource resource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3);
+
+        assertEquals(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), mockAdminClient.describeConfigs(Collections.singleton(resource)).values().get(resource).get().get(TopicConfig.CLEANUP_POLICY_CONFIG));
+        assertEquals(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT), mockAdminClient.describeConfigs(Collections.singleton(resource2)).values().get(resource2).get().get(TopicConfig.CLEANUP_POLICY_CONFIG));
+        assertEquals(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE), mockAdminClient.describeConfigs(Collections.singleton(resource3)).values().get(resource3).get().get(TopicConfig.CLEANUP_POLICY_CONFIG));
+
     }
 
     @Test
@@ -151,7 +184,7 @@ public class InternalTopicManagerTest {
             null);
 
         try {
-            final InternalTopicConfig internalTopicConfig = new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), Collections.<String, String>emptyMap());
+            final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.<String, String>emptyMap());
             internalTopicConfig.setNumberOfPartitions(1);
             internalTopicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig));
             fail("Should have thrown StreamsException");
@@ -169,9 +202,9 @@ public class InternalTopicManagerTest {
         // attempt to create it again with replication 1
         final InternalTopicManager internalTopicManager2 = new InternalTopicManager(
             mockAdminClient,
-            config);
+            new StreamsConfig(config));
 
-        final InternalTopicConfig internalTopicConfig = new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), Collections.<String, String>emptyMap());
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.<String, String>emptyMap());
         internalTopicConfig.setNumberOfPartitions(1);
         internalTopicManager2.makeReady(Collections.singletonMap(topic, internalTopicConfig));
     }
@@ -185,7 +218,7 @@ public class InternalTopicManagerTest {
     public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
         mockAdminClient.timeoutNextRequest(4);
 
-        final InternalTopicConfig internalTopicConfig = new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), Collections.<String, String>emptyMap());
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.<String, String>emptyMap());
         internalTopicConfig.setNumberOfPartitions(1);
         try {
             internalTopicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig));


Mime
View raw message