kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-3741; allow users to specify default topic configs for internal topics
Date Fri, 21 Jul 2017 11:15:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk beb0ad9bb -> 1110f66fa


KAFKA-3741; allow users to specify default topic configs for internal topics

Allow users to specify default topic configs for streams internal topics by supplying properties
from `TopicConfig` with a prefix.
Supplied defaults are used when creating the internal topics. They are overridden by the configs
supplied along with the `InternalTopicConfig`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3459 from dguy/kafka-3741


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1110f66f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1110f66f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1110f66f

Branch: refs/heads/trunk
Commit: 1110f66fa7278212160c6e8392b6e6ab09560452
Parents: beb0ad9
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Jul 21 12:15:40 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Jul 21 12:15:40 2017 +0100

----------------------------------------------------------------------
 docs/streams/developer-guide.html               |  39 ++++++
 .../org/apache/kafka/streams/KafkaStreams.java  |   2 +-
 .../org/apache/kafka/streams/StreamsConfig.java |  21 +++-
 .../internals/StreamPartitionAssignor.java      |   2 +-
 .../processor/internals/StreamsKafkaClient.java |  86 +++++++------
 .../internals/InternalTopicManagerTest.java     |   3 +-
 .../internals/StreamsKafkaClientTest.java       | 125 ++++++++++++++++++-
 .../kafka/test/MockInternalTopicManager.java    |   2 +-
 8 files changed, 234 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1110f66f/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index a113fd7..27ff81b 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -1006,6 +1006,45 @@
         So we can avoid the situation where one instance is assigned all tasks, begins restoring/processing,
only to shortly after be rebalanced, and then have to start again with half of the tasks and
so on.
     </p>
 
+    <h4><a id="streams_topic_config" href="#streams_topic_config">Internal Topic
Configuration</a></h4>
+    <p>
+        Kafka Streams automatically creates internal repartitioning and changelog topics.
+        You can override the default configs used when creating these topics by adding any
configs from <code>TopicConfig</code> to your <code>StreamsConfig</code>
with the prefix <code>StreamsConfig.TOPIC_PREFIX</code>:
+    </p>
+
+    <pre class="brush: java;">
+    Properties settings = new Properties();
+    // Example of a "normal" setting for Kafka Streams
+    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
+
+    // Add a topic config by prefixing with topic.
+    settings.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.SEGMENT_BYTES_CONFIG, 1024 * 1024);
+
+    // Alternatively, you can use
+    settings.put(StreamsConfig.topicPrefix(ConsumerConfig.SEGMENT_BYTES_CONFIG), 1024 * 1024);
+    </pre>
+
+    <p>
+        For changelog topics you can also override the default configs on a per store basis.
+        This can be done by using any method overload that has a <code>StateStoreSupplier</code>
as a parameter:
+    </p>
+
+    <pre class="brush: java;">
+        // a map to add topic config
+        Map&lt;String, String&gt; topicConfig = new HashMap<>();
+        topicConfig.put(TopicConfig.SEGMENT_MS_CONFIG, "10000");
+
+        StateStoreSupplier supplier = Stores.create("store")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.String())
+                .persistent()
+                .enableLogging(topicConfig) // pass in the config overrides
+                .build();
+        
+        groupedStream.count(supplier)
+    </pre>
+
+
 
     <h4><a id="streams_execute" href="#streams_execute">Executing Your Kafka
Streams Application</a></h4>
     <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/1110f66f/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 67535d6..028713b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -524,7 +524,7 @@ public class KafkaStreams {
      * @throws StreamsException if brokers have version 0.10.0.x
      */
     private void checkBrokerVersionCompatibility() throws StreamsException {
-        final StreamsKafkaClient client = new StreamsKafkaClient(config);
+        final StreamsKafkaClient client = StreamsKafkaClient.create(config);
 
         client.checkBrokerCompatibility(EXACTLY_ONCE.equals(config.getString(PROCESSING_GUARANTEE_CONFIG)));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1110f66f/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 9cdfe8a..94ad87b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -99,7 +99,15 @@ public class StreamsConfig extends AbstractConfig {
      */
     public static final String CONSUMER_PREFIX = "consumer.";
 
-    // Prefix used to isolate producer configs from consumer configs.
+
+    /**
+     * Prefix used to provide default topic configs to be applied when creating internal
topics.
+     * These should be valid properties from {@link org.apache.kafka.common.config.TopicConfig
TopicConfig}.
+     * It is recommended to use {@link #topicPrefix(String)}.
+     */
+    public static final String TOPIC_PREFIX = "topic.";
+
+
     /**
      * Prefix used to isolate {@link KafkaProducer producer} configs from {@link KafkaConsumer
consumer} configs.
      * It is recommended to use {@link #producerPrefix(String)} to add this prefix to {@link
ProducerConfig producer
@@ -563,6 +571,17 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
+     * Prefix a property with {@link #TOPIC_PREFIX}
+     * used to provide default topic configs to be applied when creating internal topics.
+     *
+     * @param topicProp the topic property to be masked
+     * @return TOPIC_PREFIX + {@code topicProp}
+     */
+    public static String topicPrefix(final String topicProp) {
+        return TOPIC_PREFIX + topicProp;
+    }
+
+    /**
      * Return a copy of the config definition.
      *
      * @return a copy of the config definition

http://git-wip-us.apache.org/repos/asf/kafka/blob/1110f66f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 4eadb99..91856b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -239,7 +239,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
 
         internalTopicManager = new InternalTopicManager(
-                new StreamsKafkaClient(this.streamThread.config),
+                StreamsKafkaClient.create(this.streamThread.config),
                 configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer)
configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
                 configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
?
                         (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1110f66f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 148a51d..b24614f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -72,72 +72,88 @@ public class StreamsKafkaClient {
 
     public static class Config extends AbstractConfig {
 
-        public static Config fromStreamsConfig(StreamsConfig streamsConfig) {
+        static Config fromStreamsConfig(StreamsConfig streamsConfig) {
             return new Config(streamsConfig.originals());
         }
 
-        public Config(Map<?, ?> originals) {
+        Config(Map<?, ?> originals) {
             super(CONFIG, originals, false);
         }
-    }
 
+    }
     private final KafkaClient kafkaClient;
     private final List<MetricsReporter> reporters;
     private final Config streamsConfig;
-
+    private final Map<String, String> defaultTopicConfigs = new HashMap<>();
     private static final int MAX_INFLIGHT_REQUESTS = 100;
 
-    public StreamsKafkaClient(final StreamsConfig streamsConfig) {
-        this(Config.fromStreamsConfig(streamsConfig));
-    }
 
-    public StreamsKafkaClient(final Config streamsConfig) {
+    StreamsKafkaClient(final Config streamsConfig,
+                       final KafkaClient kafkaClient,
+                       final List<MetricsReporter> reporters) {
         this.streamsConfig = streamsConfig;
+        this.kafkaClient = kafkaClient;
+        this.reporters = reporters;
+        extractDefaultTopicConfigs(streamsConfig.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX));
+    }
 
+    private void extractDefaultTopicConfigs(final Map<String, Object> configs) {
+        for (final Map.Entry<String, Object> entry : configs.entrySet()) {
+            if (entry.getValue() != null) {
+                defaultTopicConfigs.put(entry.getKey(), entry.getValue().toString());
+            }
+        }
+    }
+
+
+    public static StreamsKafkaClient create(final Config streamsConfig) {
         final Time time = new SystemTime();
 
         final Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("client-id", StreamsConfig.CLIENT_ID_CONFIG);
 
         final Metadata metadata = new Metadata(streamsConfig.getLong(
-            StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
-            streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
-            false
-        );
+                StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
+                                               streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
false);
         final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
         metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(),
time.milliseconds());
 
         final MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
                 .timeWindow(streamsConfig.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS)
                 .tags(metricTags);
-        reporters = streamsConfig.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class);
+        final List<MetricsReporter> reporters = streamsConfig.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                                                         MetricsReporter.class);
         // TODO: This should come from the KafkaStream
-        reporters.add(new JmxReporter("kafka.admin"));
+        reporters.add(new JmxReporter("kafka.admin.client"));
         final Metrics metrics = new Metrics(metricConfig, reporters, time);
 
         final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig);
 
         final Selector selector = new Selector(
-            streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-            metrics,
-            time,
-            "kafka-client",
-            channelBuilder);
-
-        kafkaClient = new NetworkClient(
-            selector,
-            metadata,
-            streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG),
-            MAX_INFLIGHT_REQUESTS, // a fixed large enough value will suffice
-            streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG),
-            streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
-            streamsConfig.getInt(StreamsConfig.SEND_BUFFER_CONFIG),
-            streamsConfig.getInt(StreamsConfig.RECEIVE_BUFFER_CONFIG),
-            streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG),
-            time,
-            true,
-            new ApiVersions());
+                streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+                metrics,
+                time,
+                "kafka-client",
+                channelBuilder);
+
+        final KafkaClient kafkaClient = new NetworkClient(
+                selector,
+                metadata,
+                streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG),
+                MAX_INFLIGHT_REQUESTS, // a fixed large enough value will suffice
+                streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                streamsConfig.getInt(StreamsConfig.SEND_BUFFER_CONFIG),
+                streamsConfig.getInt(StreamsConfig.RECEIVE_BUFFER_CONFIG),
+                streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                time,
+                true,
+                new ApiVersions());
+        return new StreamsKafkaClient(streamsConfig, kafkaClient, reporters);
+    }
+
+    public static StreamsKafkaClient create(final StreamsConfig streamsConfig) {
+        return create(Config.fromStreamsConfig(streamsConfig));
     }
 
     public void close() throws IOException {
@@ -161,7 +177,7 @@ public class StreamsKafkaClient {
             InternalTopicConfig internalTopicConfig = entry.getKey();
             Integer partitions = entry.getValue();
             final Properties topicProperties = internalTopicConfig.toProperties(windowChangeLogAdditionalRetention);
-            final Map<String, String> topicConfig = new HashMap<>();
+            final Map<String, String> topicConfig = new HashMap<>(defaultTopicConfigs);
             for (String key : topicProperties.stringPropertyNames()) {
                 topicConfig.put(key, topicProperties.getProperty(key));
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1110f66f/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index aef0d35..1a46356 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -120,7 +121,7 @@ public class InternalTopicManagerTest {
     private class MockStreamKafkaClient extends StreamsKafkaClient {
 
         MockStreamKafkaClient(final StreamsConfig streamsConfig) {
-            super(streamsConfig);
+            super(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig), new MockClient(new
MockTime()), Collections.EMPTY_LIST);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/1110f66f/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
index 2b43ebb..6c4342f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
@@ -16,30 +16,143 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
+import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Properties;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static java.util.Arrays.asList;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 public class StreamsKafkaClientTest {
 
+    private static final String TOPIC = "topic";
+    private final MockClient kafkaClient = new MockClient(new MockTime());
+    private final List<MetricsReporter> reporters = Collections.emptyList();
+    private final MetadataResponse metadata = new MetadataResponse(Collections.singletonList(new
Node(1, "host", 90)), "cluster", 1, Collections.<MetadataResponse.TopicMetadata>emptyList());
+    private final Map<String, Object> config = new HashMap<>();
+    private final InternalTopicConfig topicConfigWithNoOverrides = new InternalTopicConfig(TOPIC,
+                                                                                        
  Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                                                                        
  Collections.<String, String>emptyMap());
+
+    private final Map<String, String> overridenTopicConfig = Collections.singletonMap(TopicConfig.DELETE_RETENTION_MS_CONFIG,
"100");
+    private final InternalTopicConfig topicConfigWithOverrides = new InternalTopicConfig(TOPIC,
+                                                                                        
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                                                        
overridenTopicConfig);
+
+
+    @Before
+    public void before() {
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "some_app_id");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+    }
+
     @Test
     public void testConfigFromStreamsConfig() {
         for (final String expectedMechanism : asList("PLAIN", "SCRAM-SHA-512")) {
-            final Properties props = new Properties();
-            props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "some_app_id");
-            props.setProperty(SaslConfigs.SASL_MECHANISM, expectedMechanism);
-            props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
-            final StreamsConfig streamsConfig = new StreamsConfig(props);
+            config.put(SaslConfigs.SASL_MECHANISM, expectedMechanism);
+            final StreamsConfig streamsConfig = new StreamsConfig(config);
             final AbstractConfig config = StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig);
             assertEquals(expectedMechanism, config.values().get(SaslConfigs.SASL_MECHANISM));
             assertEquals(expectedMechanism, config.getString(SaslConfigs.SASL_MECHANISM));
         }
     }
 
+    @Test
+    public void shouldAddCleanupPolicyToTopicConfigWhenCreatingTopic() throws Exception {
+        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
+        verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy",
"delete"));
+    }
+
+
+    @Test
+    public void shouldAddDefaultTopicConfigFromStreamConfig() throws Exception {
+        config.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), "100");
+        config.put(StreamsConfig.topicPrefix(TopicConfig.COMPRESSION_TYPE_CONFIG), "gzip");
+
+        final Map<String, String> expectedConfigs = new HashMap<>();
+        expectedConfigs.put(TopicConfig.SEGMENT_MS_CONFIG, "100");
+        expectedConfigs.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip");
+        expectedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
+        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
+        verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithNoOverrides, expectedConfigs);
+    }
+
+    @Test
+    public void shouldSetPropertiesDefinedByInternalTopicConfig() throws Exception {
+        final Map<String, String> expectedConfigs = new HashMap<>(overridenTopicConfig);
+        expectedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
+        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
+        verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithOverrides, expectedConfigs);
+    }
+
+    @Test
+    public void shouldOverrideDefaultTopicConfigsFromStreamsConfig() throws Exception {
+        config.put(StreamsConfig.topicPrefix(TopicConfig.DELETE_RETENTION_MS_CONFIG), "99999");
+        config.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), "988");
+
+        final Map<String, String> expectedConfigs = new HashMap<>(overridenTopicConfig);
+        expectedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
+        expectedConfigs.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "100");
+        expectedConfigs.put(TopicConfig.SEGMENT_MS_CONFIG, "988");
+        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
+        verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithOverrides, expectedConfigs);
+    }
+
+    @Test
+    public void shouldNotAllowNullTopicConfigs() throws Exception {
+        config.put(StreamsConfig.topicPrefix(TopicConfig.DELETE_RETENTION_MS_CONFIG), null);
+        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
+        verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy",
"delete"));
+    }
+
+    private void verifyCorrectTopicConfigs(final StreamsKafkaClient streamsKafkaClient,
+                                           final InternalTopicConfig internalTopicConfig,
+                                           final Map<String, String> expectedConfigs)
{
+        final Map<String, String> requestedTopicConfigs = new HashMap<>();
+
+        kafkaClient.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(final AbstractRequest body) {
+                if (!(body instanceof CreateTopicsRequest)) {
+                    return false;
+                }
+                final CreateTopicsRequest request = (CreateTopicsRequest) body;
+                final Map<String, CreateTopicsRequest.TopicDetails> topics =
+                        request.topics();
+                final CreateTopicsRequest.TopicDetails topicDetails = topics.get(TOPIC);
+                requestedTopicConfigs.putAll(topicDetails.configs);
+                return true;
+            }
+        }, new CreateTopicsResponse(Collections.singletonMap(TOPIC, ApiError.NONE)));
+
+        streamsKafkaClient.createTopics(Collections.singletonMap(internalTopicConfig, 1),
1, 1, metadata);
+
+        assertThat(requestedTopicConfigs, equalTo(expectedConfigs));
+    }
+
+    private StreamsKafkaClient createStreamsKafkaClient() {
+        final StreamsConfig streamsConfig = new StreamsConfig(config);
+        return new StreamsKafkaClient(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig),
+                                                                             kafkaClient,
+                                                                             reporters);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1110f66f/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
index d6ada4b..3908305 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
@@ -38,7 +38,7 @@ public class MockInternalTopicManager extends InternalTopicManager {
     private MockConsumer<byte[], byte[]> restoreConsumer;
 
     public MockInternalTopicManager(StreamsConfig streamsConfig, MockConsumer<byte[],
byte[]> restoreConsumer) {
-        super(new StreamsKafkaClient(streamsConfig), 0, 0, new MockTime());
+        super(StreamsKafkaClient.create(streamsConfig), 0, 0, new MockTime());
 
         this.restoreConsumer = restoreConsumer;
     }


Mime
View raw message