kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-4857: Replace StreamsKafkaClient with AdminClient in Kafka Streams
Date Fri, 08 Dec 2017 00:16:59 GMT
KAFKA-4857: Replace StreamsKafkaClient with AdminClient in Kafka Streams

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #4242 from mjsax/kafka-4857-admit-client


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/234ec8a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/234ec8a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/234ec8a8

Branch: refs/heads/trunk
Commit: 234ec8a8af76bfb7874dd99714a65089d6048953
Parents: 86e2bc9
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Dec 7 16:16:54 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Dec 7 16:16:54 2017 -0800

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |   3 +
 .../kafka/clients/admin/TopicDescription.java   |  20 ++
 .../apache/kafka/common/TopicPartitionInfo.java |  22 ++
 .../kafka/clients/admin/MockAdminClient.java    | 264 ++++++++++++++
 .../org/apache/kafka/streams/KafkaStreams.java  |   2 +-
 .../org/apache/kafka/streams/StreamsConfig.java |   3 +-
 .../internals/InternalTopicConfig.java          |  23 +-
 .../internals/InternalTopicManager.java         | 263 ++++++++++----
 .../internals/StreamPartitionAssignor.java      |  44 +--
 .../streams/processor/internals/StreamTask.java |   2 +
 .../processor/internals/StreamThread.java       |  12 +-
 .../processor/internals/StreamsKafkaClient.java | 357 -------------------
 .../processor/internals/TaskManager.java        |   9 +-
 .../apache/kafka/streams/StreamsConfigTest.java |  44 ++-
 .../internals/InternalTopicConfigTest.java      |   5 +-
 .../internals/InternalTopicManagerTest.java     | 252 +++++++------
 .../internals/StreamPartitionAssignorTest.java  |  28 +-
 .../processor/internals/StreamThreadTest.java   |   4 -
 .../internals/StreamsKafkaClientTest.java       | 219 ------------
 .../processor/internals/TaskManagerTest.java    |   6 +-
 .../streams/tests/BrokerCompatibilityTest.java  |   1 +
 .../kafka/test/MockInternalTopicManager.java    |  28 +-
 .../streams_broker_compatibility_test.py        |   4 +-
 23 files changed, 744 insertions(+), 871 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6218794..f610e88 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -69,6 +69,9 @@
     <suppress checks="ClassFanOutComplexity"
               files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient)Test.java"/>
 
+    <suppress checks="ClassFanOutComplexity"
+              files="MockAdminClient.java"/>
+
     <suppress checks="JavaNCSS"
               files="RequestResponseTest.java"/>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
index c220892..4e3e59a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -30,6 +30,26 @@ public class TopicDescription {
     private final boolean internal;
     private final List<TopicPartitionInfo> partitions;
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TopicDescription that = (TopicDescription) o;
+
+        if (internal != that.internal) return false;
+        if (name != null ? !name.equals(that.name) : that.name != null) return false;
+        return partitions != null ? partitions.equals(that.partitions) : that.partitions == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = name != null ? name.hashCode() : 0;
+        result = 31 * result + (internal ? 1 : 0);
+        result = 31 * result + (partitions != null ? partitions.hashCode() : 0);
+        return result;
+    }
+
     /**
      * Create an instance with the specified parameters.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
index be69318..7edf714 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
@@ -82,4 +82,26 @@ public class TopicPartitionInfo {
         return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
             Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")";
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TopicPartitionInfo that = (TopicPartitionInfo) o;
+
+        if (partition != that.partition) return false;
+        if (leader != null ? !leader.equals(that.leader) : that.leader != null) return false;
+        if (replicas != null ? !replicas.equals(that.replicas) : that.replicas != null) return false;
+        return isr != null ? isr.equals(that.isr) : that.isr == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = partition;
+        result = 31 * result + (leader != null ? leader.hashCode() : 0);
+        result = 31 * result + (replicas != null ? replicas.hashCode() : 0);
+        result = 31 * result + (isr != null ? isr.hashCode() : 0);
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
new file mode 100644
index 0000000..11fe428
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class MockAdminClient extends AdminClient {
+    private final List<Node> brokers;
+    private final Map<String, TopicMetadata> allTopics = new HashMap<>();
+
+    private int timeoutNextRequests = 0;
+
+    public MockAdminClient(List<Node> brokers) {
+        this.brokers = brokers;
+    }
+
+    public void addTopic(boolean internal,
+                         String name,
+                         List<TopicPartitionInfo> partitions,
+                         Map<String, String> configs) {
+        if (allTopics.containsKey(name)) {
+            throw new IllegalArgumentException(String.format("Topic %s was already added.", name));
+        }
+        List<Node> replicas = null;
+        for (TopicPartitionInfo partition : partitions) {
+            if (!brokers.contains(partition.leader())) {
+                throw new IllegalArgumentException("Leader broker unknown");
+            }
+            if (!brokers.containsAll(partition.replicas())) {
+                throw new IllegalArgumentException("Unknown brokers in replica list");
+            }
+            if (!brokers.containsAll(partition.isr())) {
+                throw new IllegalArgumentException("Unknown brokers in isr list");
+            }
+
+            if (replicas == null) {
+                replicas = partition.replicas();
+            } else if (!replicas.equals(partition.replicas())) {
+                throw new IllegalArgumentException("All partitions need to have the same replica nodes.");
+            }
+        }
+
+        allTopics.put(name, new TopicMetadata(internal, partitions, configs));
+    }
+
+    public void timeoutNextRequest(int numberOfRequest) {
+        timeoutNextRequests = numberOfRequest;
+    }
+
+    @Override
+    public DescribeClusterResult describeCluster(DescribeClusterOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options) {
+        Map<String, KafkaFuture<Void>> createTopicResult = new HashMap<>();
+
+        if (timeoutNextRequests > 0) {
+            for (final NewTopic newTopic : newTopics) {
+                String topicName = newTopic.name();
+
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new TimeoutException());
+                createTopicResult.put(topicName, future);
+            }
+
+            --timeoutNextRequests;
+            return new CreateTopicsResult(createTopicResult);
+        }
+
+        for (final NewTopic newTopic : newTopics) {
+            KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+
+            String topicName = newTopic.name();
+            if (allTopics.containsKey(topicName)) {
+                future.completeExceptionally(new TopicExistsException(String.format("Topic %s exists already.", topicName)));
+                createTopicResult.put(topicName, future);
+            }
+            int replicationFactor = newTopic.replicationFactor();
+            List<Node> replicas = new ArrayList<>(replicationFactor);
+            for (int i = 0; i < replicationFactor; ++i) {
+                replicas.add(brokers.get(i));
+            }
+
+            int numberOfPartitions = newTopic.numPartitions();
+            List<TopicPartitionInfo> partitions = new ArrayList<>(numberOfPartitions);
+            for (int p = 0; p < numberOfPartitions; ++p) {
+                partitions.add(new TopicPartitionInfo(p, brokers.get(0), replicas, Collections.<Node>emptyList()));
+            }
+            allTopics.put(topicName, new TopicMetadata(false, partitions, newTopic.convertToTopicDetails().configs));
+            future.complete(null);
+            createTopicResult.put(topicName, future);
+        }
+
+        return new CreateTopicsResult(createTopicResult);
+    }
+
+    @Override
+    public ListTopicsResult listTopics(ListTopicsOptions options) {
+        Map<String, TopicListing> topicListings = new HashMap<>();
+
+        if (timeoutNextRequests > 0) {
+            KafkaFutureImpl<Map<String, TopicListing>> future = new KafkaFutureImpl<>();
+            future.completeExceptionally(new TimeoutException());
+
+            --timeoutNextRequests;
+            return new ListTopicsResult(future);
+        }
+
+        for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) {
+            String topicName = topicDescription.getKey();
+            topicListings.put(topicName, new TopicListing(topicName, topicDescription.getValue().isInternalTopic));
+        }
+
+        KafkaFutureImpl<Map<String, TopicListing>> future = new KafkaFutureImpl<>();
+        future.complete(topicListings);
+        return new ListTopicsResult(future);
+    }
+
+    @Override
+    public DescribeTopicsResult describeTopics(Collection<String> topicNames, DescribeTopicsOptions options) {
+        Map<String, KafkaFuture<TopicDescription>> topicDescriptions = new HashMap<>();
+
+        if (timeoutNextRequests > 0) {
+            for (String requestedTopic : topicNames) {
+                KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new TimeoutException());
+                topicDescriptions.put(requestedTopic, future);
+            }
+
+            --timeoutNextRequests;
+            return new DescribeTopicsResult(topicDescriptions);
+        }
+
+        for (String requestedTopic : topicNames) {
+            for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) {
+                String topicName = topicDescription.getKey();
+                if (topicName.equals(requestedTopic)) {
+                    TopicMetadata topicMetadata = topicDescription.getValue();
+                    KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
+                    future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions));
+                    topicDescriptions.put(topicName, future);
+                    break;
+                }
+            }
+            if (!topicDescriptions.containsKey(requestedTopic)) {
+                KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new UnknownTopicOrPartitionException(
+                    String.format("Topic %s unknown.", requestedTopic)));
+                topicDescriptions.put(requestedTopic, future);
+            }
+        }
+
+        return new DescribeTopicsResult(topicDescriptions);
+    }
+
+    @Override
+    public DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete, DeleteRecordsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public void close(long duration, TimeUnit unit) {}
+
+
+    private final static class TopicMetadata {
+        final boolean isInternalTopic;
+        final List<TopicPartitionInfo> partitions;
+        final Map<String, String> configs;
+
+        TopicMetadata(boolean isInternalTopic,
+                      List<TopicPartitionInfo> partitions,
+                      Map<String, String> configs) {
+            this.isInternalTopic = isInternalTopic;
+            this.partitions = partitions;
+            this.configs = configs != null ? configs : Collections.<String, String>emptyMap();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 1844cde..641455b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -651,7 +651,7 @@ public class KafkaStreams {
         }
 
         // use client id instead of thread client id since this admin client may be shared among threads
-        this.adminClient = clientSupplier.getAdminClient(config.getAdminConfigs(clientId));
+        adminClient = clientSupplier.getAdminClient(config.getAdminConfigs(clientId));
 
         final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 26919c6..49b8a3c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -740,8 +740,9 @@ public class StreamsConfig extends AbstractConfig {
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
         consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
         consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
-
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
+        final AdminClientConfig config = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
+        consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), config.getInt(AdminClientConfig.RETRIES_CONFIG));
 
         return consumerProps;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index 7931f32..c6758d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -31,17 +31,20 @@ public class InternalTopicConfig {
     public enum CleanupPolicy { compact, delete }
 
     private final String name;
+    private int numberOfPartitions = -1;
     private final Map<String, String> logConfig;
     private final Set<CleanupPolicy> cleanupPolicies;
 
     private Long retentionMs;
 
-    public InternalTopicConfig(final String name, final Set<CleanupPolicy> defaultCleanupPolicies, final Map<String, String> logConfig) {
+    public InternalTopicConfig(final String name,
+                               final Set<CleanupPolicy> defaultCleanupPolicies,
+                               final Map<String, String> logConfig) {
         Objects.requireNonNull(name, "name can't be null");
         Topic.validate(name);
 
         if (defaultCleanupPolicies.isEmpty()) {
-            throw new IllegalArgumentException("Must provide at least one cleanup policy");
+            throw new IllegalArgumentException("Must provide at least one cleanup policy.");
         }
         this.name = name;
         this.cleanupPolicies = defaultCleanupPolicies;
@@ -91,7 +94,21 @@ public class InternalTopicConfig {
         return name;
     }
 
-    public void setRetentionMs(final long retentionMs) {
+    public int numberOfPartitions() {
+        if (numberOfPartitions == -1) {
+            throw new IllegalStateException("Number of partitions not specified.");
+        }
+        return numberOfPartitions;
+    }
+
+    void setNumberOfPartitions(final int numberOfPartitions) {
+        if (numberOfPartitions < 1) {
+            throw new IllegalArgumentException("Number of partitions must be at least 1.");
+        }
+        this.numberOfPartitions = numberOfPartitions;
+    }
+
+    void setRetentionMs(final long retentionMs) {
         if (!logConfig.containsKey(InternalTopicManager.RETENTION_MS)) {
             this.retentionMs = retentionMs;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index f8d4eec..cae3128 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -16,44 +16,76 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 public class InternalTopicManager {
 
-    static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
+    private static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
 
-    public static final String CLEANUP_POLICY_PROP = "cleanup.policy";
-    public static final String RETENTION_MS = "retention.ms";
-    private static final int MAX_TOPIC_READY_TRY = 5;
+    public final static String CLEANUP_POLICY_PROP = "cleanup.policy";
+    public final static String RETENTION_MS = "retention.ms";
+
+    private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. " +
+        "Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
 
     private final Logger log;
-    private final Time time;
     private final long windowChangeLogAdditionalRetention;
+    private final Map<String, String> defaultTopicConfigs = new HashMap<>();
+
+    private final short replicationFactor;
+    private final AdminClient adminClient;
 
-    private final int replicationFactor;
-    private final StreamsKafkaClient streamsKafkaClient;
+    private final int retries;
 
-    public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient,
-                                final int replicationFactor,
-                                final long windowChangeLogAdditionalRetention,
-                                final Time time) {
-        this.streamsKafkaClient = streamsKafkaClient;
-        this.replicationFactor = replicationFactor;
-        this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
-        this.time = time;
+    public InternalTopicManager(final AdminClient adminClient,
+                                final Map<String, ?> config) {
+        this.adminClient = adminClient;
+        final StreamsConfig streamsConfig = new StreamsConfig(config);
 
         LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
-        this.log = logContext.logger(getClass());
+        log = logContext.logger(getClass());
+
+        replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
+        windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
+        retries = new AdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);
+
+        log.debug("Configs:" + Utils.NL,
+            "\t{} = {}" + Utils.NL,
+            "\t{} = {}" + Utils.NL,
+            "\t{} = {}",
+            AdminClientConfig.RETRIES_CONFIG, retries,
+            StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor,
+            StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, windowChangeLogAdditionalRetention);
+
+        for (final Map.Entry<String, Object> entry : streamsConfig.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX).entrySet()) {
+            if (entry.getValue() != null) {
+                defaultTopicConfigs.put(entry.getKey(), entry.getValue().toString());
+            }
+        }
     }
 
     /**
@@ -63,83 +95,172 @@ public class InternalTopicManager {
      * If a topic with the correct number of partitions exists ignores it.
      * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again.
      */
-    public void makeReady(final Map<InternalTopicConfig, Integer> topics) {
-        for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
-            try {
-                final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
-                final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
-                final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics, existingTopicPartitions);
-                if (topicsToBeCreated.size() > 0) {
-                    if (metadata.brokers().size() < replicationFactor) {
-                        throw new StreamsException("Found only " + metadata.brokers().size() + " brokers, " +
-                            " but replication factor is " + replicationFactor + "." +
-                            " Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\"" +
-                            " or add more brokers to your cluster.");
+    public void makeReady(final Map<String, InternalTopicConfig> topics) {
+        final Map<String, Integer> existingTopicPartitions = getNumPartitions(topics.keySet(), true);
+        final Set<InternalTopicConfig> topicsToBeCreated = validateTopicPartitions(topics.values(), existingTopicPartitions);
+        if (topicsToBeCreated.size() > 0) {
+            final Set<NewTopic> newTopics = new HashSet<>();
+
+            for (final InternalTopicConfig internalTopicConfig : topicsToBeCreated) {
+                final Properties topicProperties = internalTopicConfig.toProperties(windowChangeLogAdditionalRetention);
+                final Map<String, String> topicConfig = new HashMap<>(defaultTopicConfigs);
+                for (final String key : topicProperties.stringPropertyNames()) {
+                    topicConfig.put(key, topicProperties.getProperty(key));
+                }
+                newTopics.add(
+                    new NewTopic(
+                        internalTopicConfig.name(),
+                        internalTopicConfig.numberOfPartitions(),
+                        replicationFactor)
+                    .configs(topicConfig));
+            }
+
+            int remainingRetries = retries;
+            boolean retry;
+            do {
+                retry = false;
+
+                final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics);
+
+                final Set<String> createTopicNames = new HashSet<>();
+                for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) {
+                    try {
+                        createTopicResult.getValue().get();
+                        createTopicNames.add(createTopicResult.getKey());
+                    } catch (final ExecutionException couldNotCreateTopic) {
+                        final Throwable cause = couldNotCreateTopic.getCause();
+                        final String topicName = createTopicResult.getKey();
+
+                        if (cause instanceof TimeoutException) {
+                            retry = true;
+                            log.debug("Could not get number of partitions for topic {} due to timeout. " +
+                                "Will try again (remaining retries {}).", topicName, remainingRetries - 1);
+                        } else if (cause instanceof TopicExistsException) {
+                            createTopicNames.add(createTopicResult.getKey());
+                            log.info(String.format("Topic %s exist already: %s",
+                                topicName,
+                                couldNotCreateTopic.getMessage()));
+                        } else {
+                            throw new StreamsException(String.format("Could not create topic %s.", topicName),
+                                couldNotCreateTopic);
+                        }
+                    } catch (final InterruptedException fatalException) {
+                        Thread.currentThread().interrupt();
+                        log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
+                        throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
                     }
-                    streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention, metadata);
                 }
+
+                if (retry) {
+                    final Iterator<NewTopic> it = newTopics.iterator();
+                    while (it.hasNext()) {
+                        if (createTopicNames.contains(it.next().name())) {
+                            it.remove();
+                        }
+                    }
+
+                    continue;
+                }
+
                 return;
-            } catch (StreamsException ex) {
-                log.warn("Could not create internal topics: {} Retry #{}", ex.getMessage(), i);
-            }
-            // backoff
-            time.sleep(100L);
+            } while (remainingRetries-- > 0);
+
+            final String timeoutAndRetryError = "Could not create topics. " +
+                "This can happen if the Kafka cluster is temporary not available. " +
+                "You can increase admin client config `retries` to be resilient against this error.";
+            log.error(timeoutAndRetryError);
+            throw new StreamsException(timeoutAndRetryError);
         }
-        throw new StreamsException("Could not create internal topics.");
     }
 
     /**
      * Get the number of partitions for the given topics
      */
     public Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
-            try {
-                final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
-                final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
-                existingTopicPartitions.keySet().retainAll(topics);
-
-                return existingTopicPartitions;
-            } catch (StreamsException ex) {
-                log.warn("Could not get number of partitions: {} Retry #{}", ex.getMessage(), i);
+        return getNumPartitions(topics, false);
+    }
+
+    private Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                  final boolean bestEffort) {
+        int remainingRetries = retries;
+        boolean retry;
+        do {
+            retry = false;
+
+            final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
+            final Map<String, KafkaFuture<TopicDescription>> futures = describeTopicsResult.values();
+
+            final Map<String, Integer> existingNumberOfPartitionsPerTopic = new HashMap<>();
+            for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture : futures.entrySet()) {
+                try {
+                    final TopicDescription topicDescription = topicFuture.getValue().get();
+                    existingNumberOfPartitionsPerTopic.put(
+                        topicFuture.getKey(),
+                        topicDescription.partitions().size());
+                } catch (final InterruptedException fatalException) {
+                    Thread.currentThread().interrupt();
+                    log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
+                    throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
+                } catch (final ExecutionException couldNotDescribeTopicException) {
+                    final Throwable cause = couldNotDescribeTopicException.getCause();
+                    if (cause instanceof TimeoutException) {
+                        retry = true;
+                        log.debug("Could not get number of partitions for topic {} due to timeout. " +
+                            "Will try again (remaining retries {}).", topicFuture.getKey(), remainingRetries - 1);
+                    } else {
+                        final String error = "Could not get number of partitions for topic {}.";
+                        if (bestEffort) {
+                            log.debug(error, topicFuture.getKey(), cause.getMessage());
+                        } else {
+                            log.error(error, topicFuture.getKey(), cause);
+                            throw new StreamsException(cause);
+                        }
+                    }
+                }
             }
-            // backoff
-            time.sleep(100L);
+
+            if (retry) {
+                topics.removeAll(existingNumberOfPartitionsPerTopic.keySet());
+                continue;
+            }
+
+            return existingNumberOfPartitionsPerTopic;
+        } while (remainingRetries-- > 0);
+
+        if (bestEffort) {
+            return Collections.emptyMap();
         }
-        throw new StreamsException("Could not get number of partitions.");
+
+        final String timeoutAndRetryError = "Could not get number of partitions from brokers. " +
+            "This can happen if the Kafka cluster is temporary not available. " +
+            "You can increase admin client config `retries` to be resilient against this error.";
+        log.error(timeoutAndRetryError);
+        throw new StreamsException(timeoutAndRetryError);
     }
 
     /**
      * Check the existing topics to have correct number of partitions; and return the non existing topics to be created
      */
-    private Map<InternalTopicConfig, Integer> validateTopicPartitions(final Map<InternalTopicConfig, Integer> topicsPartitionsMap,
-                                                                      final Map<String, Integer> existingTopicNamesPartitions) {
-        final Map<InternalTopicConfig, Integer> topicsToBeCreated = new HashMap<>();
-        for (Map.Entry<InternalTopicConfig, Integer> entry : topicsPartitionsMap.entrySet()) {
-            InternalTopicConfig topic = entry.getKey();
-            Integer partition = entry.getValue();
+    private Set<InternalTopicConfig> validateTopicPartitions(final Collection<InternalTopicConfig> topicsPartitionsMap,
+                                                             final Map<String, Integer> existingTopicNamesPartitions) {
+        final Set<InternalTopicConfig> topicsToBeCreated = new HashSet<>();
+        for (final InternalTopicConfig topic : topicsPartitionsMap) {
+            final Integer numberOfPartitions = topic.numberOfPartitions();
             if (existingTopicNamesPartitions.containsKey(topic.name())) {
-                if (!existingTopicNamesPartitions.get(topic.name()).equals(partition)) {
-                    throw new StreamsException("Existing internal topic " + topic.name() + " has invalid partitions." +
-                            " Expected: " + partition + " Actual: " + existingTopicNamesPartitions.get(topic.name()) +
-                            ". Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.");
+                if (!existingTopicNamesPartitions.get(topic.name()).equals(numberOfPartitions)) {
+                    final String errorMsg = String.format("Existing internal topic %s has invalid partitions: " +
+                            "expected: %d; actual: %d. " +
+                            "Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.",
+                        topic.name(), numberOfPartitions, existingTopicNamesPartitions.get(topic.name()));
+                    log.error(errorMsg);
+                    throw new StreamsException(errorMsg);
                 }
             } else {
-                topicsToBeCreated.put(topic, partition);
+                topicsToBeCreated.add(topic);
             }
         }
 
         return topicsToBeCreated;
     }
 
-    private Map<String, Integer> fetchExistingPartitionCountByTopic(final MetadataResponse metadata) {
-        // The names of existing topics and corresponding partition counts
-        final Map<String, Integer> existingPartitionCountByTopic = new HashMap<>();
-        final Collection<MetadataResponse.TopicMetadata> topicsMetadata = metadata.topicMetadata();
-
-        for (MetadataResponse.TopicMetadata topicMetadata: topicsMetadata) {
-            existingPartitionCountByTopic.put(topicMetadata.topic(), topicMetadata.partitionMetadata().size());
-        }
-
-        return existingPartitionCountByTopic;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index ec42a86..57c69c8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
@@ -56,7 +55,6 @@ import static org.apache.kafka.common.utils.Utils.getPort;
 
 public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
 
-    private Time time = Time.SYSTEM;
     private final static int UNKNOWN = -1;
     public final static int NOT_AVAILABLE = -2;
 
@@ -179,21 +177,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     private CopartitionedTopicsValidator copartitionedTopicsValidator;
 
     /**
-     * Package-private method to set the time. Used for tests.
-     * @param time Time to be used.
-     */
-    void time(final Time time) {
-        this.time = time;
-    }
-
-    /**
      * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
      * since the former needs later's cached metadata while sending subscriptions,
      * and the latter needs former's returned assignment when adding tasks.
      * @throws KafkaException if the stream thread is not specified
      */
     @Override
-    public void configure(Map<String, ?> configs) {
+    public void configure(final Map<String, ?> configs) {
         final StreamsConfig streamsConfig = new StreamsConfig(configs);
 
         // Setting the logger with the passed in client thread name
@@ -238,11 +228,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             this.userEndPoint = userEndPoint;
         }
 
-        internalTopicManager = new InternalTopicManager(
-                taskManager.streamsKafkaClient,
-                streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG),
-                streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG),
-                time);
+        internalTopicManager = new InternalTopicManager(taskManager.adminClient, configs);
 
         copartitionedTopicsValidator = new CopartitionedTopicsValidator(logPrefix);
     }
@@ -626,12 +612,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         log.debug("Starting to validate internal topics in partition assignor.");
 
         // first construct the topics to make ready
-        Map<InternalTopicConfig, Integer> topicsToMakeReady = new HashMap<>();
-        Set<String> topicNamesToMakeReady = new HashSet<>();
+        final Map<String, InternalTopicConfig> topicsToMakeReady = new HashMap<>();
 
-        for (InternalTopicMetadata metadata : topicPartitions.values()) {
-            InternalTopicConfig topic = metadata.config;
-            Integer numPartitions = metadata.numPartitions;
+        for (final InternalTopicMetadata metadata : topicPartitions.values()) {
+            final InternalTopicConfig topic = metadata.config;
+            final Integer numPartitions = metadata.numPartitions;
 
             if (numPartitions == NOT_AVAILABLE) {
                 continue;
@@ -640,18 +625,19 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name()));
             }
 
-            topicsToMakeReady.put(topic, numPartitions);
-            topicNamesToMakeReady.add(topic.name());
+            topic.setNumberOfPartitions(numPartitions);
+            topicsToMakeReady.put(topic.name(), topic);
         }
 
         if (!topicsToMakeReady.isEmpty()) {
             internalTopicManager.makeReady(topicsToMakeReady);
 
             // wait until each one of the topic metadata has been propagated to at least one broker
-            while (!allTopicsCreated(topicNamesToMakeReady, topicsToMakeReady)) {
+            while (!allTopicsCreated(topicsToMakeReady)) {
                 try {
                     Thread.sleep(50L);
                 } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                     // ignore
                 }
             }
@@ -660,11 +646,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         log.debug("Completed validating internal topics in partition assignor.");
     }
 
-    private boolean allTopicsCreated(final Set<String> topicNamesToMakeReady, final Map<InternalTopicConfig, Integer> topicsToMakeReady) {
-        final Map<String, Integer> partitions = internalTopicManager.getNumPartitions(topicNamesToMakeReady);
-        for (Map.Entry<InternalTopicConfig, Integer> entry : topicsToMakeReady.entrySet()) {
-            final Integer numPartitions = partitions.get(entry.getKey().name());
-            if (numPartitions == null || !numPartitions.equals(entry.getValue())) {
+    private boolean allTopicsCreated(final Map<String, InternalTopicConfig> topicsToMakeReady) {
+        final Map<String, Integer> partitions = internalTopicManager.getNumPartitions(topicsToMakeReady.keySet());
+        for (final InternalTopicConfig topic : topicsToMakeReady.values()) {
+            final Integer numPartitions = partitions.get(topic.name());
+            if (numPartitions == null || !numPartitions.equals(topic.numberOfPartitions())) {
                 return false;
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 5460c0e..55456d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -97,6 +97,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * @param config                the {@link StreamsConfig} specified by the user
      * @param metrics               the {@link StreamsMetrics} created by the thread
      * @param stateDirectory        the {@link StateDirectory} created by the thread
+     * @param cache                 the {@link ThreadCache} created by the thread
+     * @param time                  the system {@link Time} of the thread
      * @param producer              the instance of {@link Producer} used to produce records
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d930a67..a9786f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -560,8 +560,6 @@ public class StreamThread extends Thread {
     private final Object stateLock;
     private final Logger log;
     private final String logPrefix;
-    // TODO: adminClient will be passeed to taskManager to be accessed in StreamPartitionAssignor
-    private final AdminClient adminClient;
     private final TaskManager taskManager;
     private final StreamsMetricsThreadImpl streamsMetrics;
 
@@ -620,8 +618,6 @@ public class StreamThread extends Thread {
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
 
-        final StreamsKafkaClient streamsKafkaClient = StreamsKafkaClient.create(config.originals());
-
         final AbstractTaskCreator<StreamTask> activeTaskCreator = new TaskCreator(builder,
                                                                                   config,
                                                                                   streamsMetrics,
@@ -649,7 +645,6 @@ public class StreamThread extends Thread {
                                                   streamsMetadataState,
                                                   activeTaskCreator,
                                                   standbyTaskCreator,
-                                                  streamsKafkaClient,
                                                   adminClient,
                                                   new AssignedStreamsTasks(logContext),
                                                   new AssignedStandbyTasks(logContext));
@@ -671,7 +666,6 @@ public class StreamThread extends Thread {
                 restoreConsumer,
                 consumer,
                 originalReset,
-                adminClient,
                 taskManager,
                 streamsMetrics,
                 builder,
@@ -684,7 +678,6 @@ public class StreamThread extends Thread {
                         final Consumer<byte[], byte[]> restoreConsumer,
                         final Consumer<byte[], byte[]> consumer,
                         final String originalReset,
-                        final AdminClient adminClient,
                         final TaskManager taskManager,
                         final StreamsMetricsThreadImpl streamsMetrics,
                         final InternalTopologyBuilder builder,
@@ -705,7 +698,6 @@ public class StreamThread extends Thread {
         this.restoreConsumer = restoreConsumer;
         this.consumer = consumer;
         this.originalReset = originalReset;
-        this.adminClient = adminClient;
 
         this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
@@ -716,8 +708,8 @@ public class StreamThread extends Thread {
     /**
      * Execute the stream processors
      *
-     * @throws KafkaException for any Kafka-related exceptions
-     * @throws Exception      for any other non-Kafka exceptions
+     * @throws KafkaException    for any Kafka-related exceptions
+     * @throws RuntimeException  for any other non-Kafka exceptions
      */
     @Override
     public void run() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
deleted file mode 100644
index 1e21878..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.ApiVersions;
-import org.apache.kafka.clients.ClientRequest;
-import org.apache.kafka.clients.ClientResponse;
-import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.network.ChannelBuilder;
-import org.apache.kafka.common.network.Selector;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.requests.CreateTopicsRequest;
-import org.apache.kafka.common.requests.CreateTopicsResponse;
-import org.apache.kafka.common.requests.MetadataRequest;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.BrokerNotFoundException;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-public class StreamsKafkaClient {
-
-    private static final ConfigDef CONFIG = StreamsConfig.configDef()
-            .withClientSslSupport()
-            .withClientSaslSupport();
-
-    public static class Config extends AbstractConfig {
-
-        static Config fromStreamsConfig(Map<String, ?> props) {
-            return new Config(props);
-        }
-
-        Config(Map<?, ?> originals) {
-            super(CONFIG, originals, false);
-        }
-
-    }
-    private final KafkaClient kafkaClient;
-    private final List<MetricsReporter> reporters;
-    private final Config streamsConfig;
-    private final Logger log;
-    private final Map<String, String> defaultTopicConfigs = new HashMap<>();
-    private static final int MAX_INFLIGHT_REQUESTS = 100;
-
-    StreamsKafkaClient(final Config streamsConfig,
-                       final KafkaClient kafkaClient,
-                       final List<MetricsReporter> reporters,
-                       final LogContext log) {
-        this.streamsConfig = streamsConfig;
-        this.kafkaClient = kafkaClient;
-        this.reporters = reporters;
-        this.log = log.logger(StreamsKafkaClient.class);
-        extractDefaultTopicConfigs(streamsConfig.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX));
-    }
-
-    private void extractDefaultTopicConfigs(final Map<String, Object> configs) {
-        for (final Map.Entry<String, Object> entry : configs.entrySet()) {
-            if (entry.getValue() != null) {
-                defaultTopicConfigs.put(entry.getKey(), entry.getValue().toString());
-            }
-        }
-    }
-
-
-    public static StreamsKafkaClient create(final Config streamsConfig) {
-        final Time time = new SystemTime();
-
-        final Map<String, String> metricTags = new LinkedHashMap<>();
-        final String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
-        metricTags.put("client-id", clientId);
-
-        final Metadata metadata = new Metadata(streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
-                                               streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
-                                               false);
-        final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-        metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
-
-        final MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
-                .timeWindow(streamsConfig.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
-                .tags(metricTags);
-        final List<MetricsReporter> reporters = streamsConfig.getConfiguredInstances(
-            ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-            MetricsReporter.class);
-        // TODO: This should come from the KafkaStream
-        reporters.add(new JmxReporter("kafka.admin.client"));
-        final Metrics metrics = new Metrics(metricConfig, reporters, time);
-
-        final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig);
-        final LogContext logContext = createLogContext(clientId);
-
-        final Selector selector = new Selector(
-                streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-                metrics,
-                time,
-                "kafka-client",
-                channelBuilder,
-                logContext);
-
-        final KafkaClient kafkaClient = new NetworkClient(
-                selector,
-                metadata,
-                clientId,
-                MAX_INFLIGHT_REQUESTS, // a fixed large enough value will suffice
-                streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG),
-                streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
-                streamsConfig.getInt(StreamsConfig.SEND_BUFFER_CONFIG),
-                streamsConfig.getInt(StreamsConfig.RECEIVE_BUFFER_CONFIG),
-                streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG),
-                time,
-                true,
-                new ApiVersions(),
-                logContext);
-        return new StreamsKafkaClient(streamsConfig, kafkaClient, reporters, logContext);
-    }
-
-    private static LogContext createLogContext(String clientId) {
-        return new LogContext("[StreamsKafkaClient clientId=" + clientId + "] ");
-    }
-
-    public static StreamsKafkaClient create(final Map<String, ?> props) {
-        return create(Config.fromStreamsConfig(props));
-    }
-
-    public void close() {
-        try {
-            kafkaClient.close();
-        } catch (final IOException impossible) {
-            // this can actually never happen, because NetworkClient doesn't throw any exception on close()
-            // we log just in case
-            log.error("This error indicates a bug in the code. Please report to dev@kafka.apache.org.", impossible);
-        } finally {
-            for (MetricsReporter metricsReporter: this.reporters) {
-                metricsReporter.close();
-            }
-        }
-    }
-
-    /**
-     * Create a set of new topics using batch request.
-     *
-     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
-     * @throws TimeoutException if there was no response within {@code request.timeout.ms}
-     * @throws StreamsException for any other fatal error
-     */
-    public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
-                             final int replicationFactor,
-                             final long windowChangeLogAdditionalRetention,
-                             final MetadataResponse metadata) {
-        final Map<String, CreateTopicsRequest.TopicDetails> topicRequestDetails = new HashMap<>();
-        for (final Map.Entry<InternalTopicConfig, Integer> entry : topicsMap.entrySet()) {
-            final InternalTopicConfig internalTopicConfig = entry.getKey();
-            final Integer partitions = entry.getValue();
-            final Properties topicProperties = internalTopicConfig.toProperties(windowChangeLogAdditionalRetention);
-            final Map<String, String> topicConfig = new HashMap<>(defaultTopicConfigs);
-            for (final String key : topicProperties.stringPropertyNames()) {
-                topicConfig.put(key, topicProperties.getProperty(key));
-            }
-            final CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(
-                partitions,
-                (short) replicationFactor,
-                topicConfig);
-
-            topicRequestDetails.put(internalTopicConfig.name(), topicDetails);
-        }
-
-        final ClientRequest clientRequest = kafkaClient.newClientRequest(
-            getControllerReadyBrokerId(metadata),
-            new CreateTopicsRequest.Builder(
-                topicRequestDetails,
-                streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG)),
-            Time.SYSTEM.milliseconds(),
-            true);
-        final ClientResponse clientResponse = sendRequestSync(clientRequest);
-
-        if (!clientResponse.hasResponse()) {
-            throw new StreamsException("Empty response for client request.");
-        }
-        if (!(clientResponse.responseBody() instanceof CreateTopicsResponse)) {
-            throw new StreamsException("Inconsistent response type for internal topic creation request. " +
-                "Expected CreateTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
-        }
-        final CreateTopicsResponse createTopicsResponse =  (CreateTopicsResponse) clientResponse.responseBody();
-
-        for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) {
-            ApiError error = createTopicsResponse.errors().get(internalTopicConfig.name());
-            if (error.isFailure() && !error.is(Errors.TOPIC_ALREADY_EXISTS)) {
-                throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + " due to " + error.messageWithFallback());
-            }
-        }
-    }
-
-    /**
-     *
-     * @param nodes List of nodes to pick from.
-     * @return The first node that is ready to accept requests.
-     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
-     */
-    private String ensureOneNodeIsReady(final List<Node> nodes) {
-        String brokerId = null;
-        final long readyTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
-        boolean foundNode = false;
-        while (!foundNode && (Time.SYSTEM.milliseconds() < readyTimeout)) {
-            for (Node node: nodes) {
-                if (kafkaClient.ready(node, Time.SYSTEM.milliseconds())) {
-                    brokerId = Integer.toString(node.id());
-                    foundNode = true;
-                    break;
-                }
-            }
-            try {
-                kafkaClient.poll(50, Time.SYSTEM.milliseconds());
-            } catch (final RuntimeException e) {
-                throw new StreamsException("Could not poll.", e);
-            }
-        }
-        if (brokerId == null) {
-            throw new BrokerNotFoundException("Could not find any available broker. " +
-                "Check your StreamsConfig setting '" + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG + "'. " +
-                "This error might also occur, if you try to connect to pre-0.10 brokers. " +
-                "Kafka Streams requires broker version 0.10.1.x or higher.");
-        }
-        return brokerId;
-    }
-
-    /**
-     * @return if Id of the controller node, or an exception if no controller is found or
-     * controller is not ready
-     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
-     */
-    private String getControllerReadyBrokerId(final MetadataResponse metadata) {
-        return ensureOneNodeIsReady(Collections.singletonList(metadata.controller()));
-    }
-
-    /**
-     * @return the Id of any broker that is ready, or an exception if no broker is ready.
-     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
-     */
-    private String getAnyReadyBrokerId() {
-        final Metadata metadata = new Metadata(
-            streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
-            streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
-            false);
-        final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-        metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), Time.SYSTEM.milliseconds());
-
-        final List<Node> nodes = metadata.fetch().nodes();
-        return ensureOneNodeIsReady(nodes);
-    }
-
-    /**
-     * @return the response to the request
-     * @throws TimeoutException if there was no response within {@code request.timeout.ms}
-     * @throws StreamsException any other fatal error
-     */
-    private ClientResponse sendRequestSync(final ClientRequest clientRequest) {
-        try {
-            kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
-        } catch (final RuntimeException e) {
-            throw new StreamsException("Could not send request.", e);
-        }
-
-        // Poll for the response.
-        final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
-        while (Time.SYSTEM.milliseconds() < responseTimeout) {
-            final List<ClientResponse> responseList;
-            try {
-                responseList = kafkaClient.poll(100, Time.SYSTEM.milliseconds());
-            } catch (final RuntimeException e) {
-                throw new StreamsException("Could not poll.", e);
-            }
-            if (!responseList.isEmpty()) {
-                if (responseList.size() > 1) {
-                    throw new StreamsException("Sent one request but received multiple or no responses.");
-                }
-                final ClientResponse response = responseList.get(0);
-                if (response.requestHeader().correlationId() == clientRequest.correlationId()) {
-                    return response;
-                } else {
-                    throw new StreamsException("Inconsistent response received from the broker "
-                        + clientRequest.destination() + ", expected correlation id " + clientRequest.correlationId()
-                        + ", but received " + response.requestHeader().correlationId());
-                }
-            }
-        }
-
-        throw new TimeoutException("Failed to get response from broker within timeout");
-    }
-
-    /**
-     * Fetch the metadata for all topics.
-     *
-     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
-     * @throws TimeoutException if there was no response within {@code request.timeout.ms}
-     * @throws StreamsException for any other fatal error
-     */
-    public MetadataResponse fetchMetadata() {
-        final ClientRequest clientRequest = kafkaClient.newClientRequest(
-            getAnyReadyBrokerId(),
-            MetadataRequest.Builder.allTopics(),
-            Time.SYSTEM.milliseconds(),
-            true);
-        final ClientResponse clientResponse = sendRequestSync(clientRequest);
-
-        if (!clientResponse.hasResponse()) {
-            throw new StreamsException("Empty response for client request.");
-        }
-        if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
-            throw new StreamsException("Inconsistent response type for internal topic metadata request. " +
-                "Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
-        }
-        return (MetadataResponse) clientResponse.responseBody();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index e6f05cd..d70c8f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -57,10 +57,7 @@ class TaskManager {
     private final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
     private final StreamsMetadataState streamsMetadataState;
 
-    // TODO: this is going to be replaced by AdminClient
-    final StreamsKafkaClient streamsKafkaClient;
-
-    private final AdminClient adminClient;
+    final AdminClient adminClient;
     private DeleteRecordsResult deleteRecordsResult;
 
     // following information is updated during rebalance phase by the partition assignor
@@ -77,7 +74,6 @@ class TaskManager {
                 final StreamsMetadataState streamsMetadataState,
                 final StreamThread.AbstractTaskCreator<StreamTask> taskCreator,
                 final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator,
-                final StreamsKafkaClient streamsKafkaClient,
                 final AdminClient adminClient,
                 final AssignedStreamsTasks active,
                 final AssignedStandbyTasks standby) {
@@ -95,7 +91,6 @@ class TaskManager {
 
         this.log = logContext.logger(getClass());
 
-        this.streamsKafkaClient = streamsKafkaClient;
         this.adminClient = adminClient;
     }
 
@@ -283,8 +278,6 @@ class TaskManager {
         taskCreator.close();
         standbyTaskCreator.close();
 
-        streamsKafkaClient.close();
-
         final RuntimeException fatalException = firstException.get();
         if (fatalException != null) {
             throw fatalException;

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 1a4cfb1..a06f7e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -28,6 +29,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.hamcrest.CoreMatchers;
 import org.junit.Before;
 import org.junit.Test;
@@ -102,6 +104,40 @@ public class StreamsConfigTest {
     }
 
     @Test
+    public void consumerConfigMustContainStreamPartitionAssignorConfig() {
+        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 42);
+        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 7L);
+        props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host");
+        props.put(StreamsConfig.RETRIES_CONFIG, 10);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+        final String groupId = "example-application";
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId, clientId);
+
+        assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
+        assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
+        assertEquals(StreamPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+        assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
+        assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
+        assertEquals(10, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
+    }
+
+    @Test
+    public void consumerConfigMustUseAdminClientConfigForRetries() {
+        props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 20);
+        props.put(StreamsConfig.RETRIES_CONFIG, 10);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+        final String groupId = "example-application";
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId, clientId);
+
+        assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
+    }
+
+    @Test
     public void testGetRestoreConsumerConfigs() {
         final String clientId = "client";
         final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
@@ -227,7 +263,13 @@ public class StreamsConfigTest {
         assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
-
+    @Test
+    public void shouldSupportNonPrefixedAdminConfigs() {
+        props.put(AdminClientConfig.RETRIES_CONFIG, 10);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> configs = streamsConfig.getAdminConfigs("clientId");
+        assertEquals(10, configs.get(AdminClientConfig.RETRIES_CONFIG));
+    }
 
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
index 044e82a..581c8cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
@@ -88,8 +88,9 @@ public class InternalTopicConfigTest {
         assertTrue(new InternalTopicConfig("name",
                                            Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
                                            Collections.<String, String>emptyMap()).isCompacted());
-        assertTrue(new InternalTopicConfig("name", Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
-                                                               InternalTopicConfig.CleanupPolicy.delete),
+        assertTrue(new InternalTopicConfig("name",
+                                           Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
+                                                       InternalTopicConfig.CleanupPolicy.delete),
                                            Collections.<String, String>emptyMap()).isCompacted());
     }
 


Mime
View raw message