kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-4857: Replace StreamsKafkaClient with AdminClient in Kafka Streams
Date Fri, 08 Dec 2017 00:16:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 86e2bc937 -> 234ec8a8a


http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index e914f9e..10d3dd9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -16,19 +16,14 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -36,172 +31,169 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
-import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class InternalTopicManagerTest {
 
+    private final Node broker1 = new Node(0, "dummyHost-1", 1234);
+    private final Node broker2 = new Node(1, "dummyHost-2", 1234);
+    private final List<Node> cluster = new ArrayList<Node>(2) {
+        {
+            add(broker1);
+            add(broker2);
+        }
+    };
     private final String topic = "test_topic";
-    private final String userEndPoint = "localhost:2171";
-    private MockStreamKafkaClient streamsKafkaClient;
-    private final Time time = new MockTime();
+    private final String topic2 = "test_topic_2";
+    private final List<Node> singleReplica = Collections.singletonList(broker1);
+
+    private MockAdminClient mockAdminClient;
+    private InternalTopicManager internalTopicManager;
+
+    private final Map<String, Object> config = new HashMap<String, Object>()
{
+        {
+            put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
+            put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker1.host() + ":" + broker1.port());
+            put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
+            put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 1);
+        }
+    };
 
     @Before
     public void init() {
-        final StreamsConfig config = new StreamsConfig(configProps());
-        streamsKafkaClient = new MockStreamKafkaClient(config);
+        mockAdminClient = new MockAdminClient(cluster);
+        internalTopicManager = new InternalTopicManager(
+            mockAdminClient,
+            config);
     }
 
     @After
     public void shutdown() throws IOException {
-        streamsKafkaClient.close();
+        mockAdminClient.close();
     }
 
     @Test
     public void shouldReturnCorrectPartitionCounts() {
-        final InternalTopicManager internalTopicManager = new InternalTopicManager(
-            streamsKafkaClient,
-            1,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
-            time);
+        mockAdminClient.addTopic(
+            false,
+            topic,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList())),
+            null);
         assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
     }
 
     @Test
-    public void shouldCreateRequiredTopics() {
-        streamsKafkaClient.returnNoMetadata = true;
+    public void shouldFailWithUnknownTopicException() {
+        mockAdminClient.addTopic(
+            false,
+            topic,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList())),
+            null);
 
-        final InternalTopicManager internalTopicManager = new InternalTopicManager(
-            streamsKafkaClient,
-            1,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
-            time);
+        try {
+            internalTopicManager.getNumPartitions(new HashSet<String>() {
+                {
+                    add(topic);
+                    add(topic2);
+                }
+            });
+            fail("Should have thrown UnknownTopicOrPartitionException.");
+        } catch (final StreamsException expected) {
+            assertTrue(expected.getCause() instanceof UnknownTopicOrPartitionException);
+        }
+    }
 
-        final InternalTopicConfig topicConfig = new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
null);
-        internalTopicManager.makeReady(Collections.singletonMap(topicConfig, 1));
+    @Test
+    public void shouldExhaustRetriesOnTimeoutExceptionForGetNumPartitions() {
+        mockAdminClient.timeoutNextRequest(2);
 
-        assertEquals(Collections.singletonMap(topic, topicConfig), streamsKafkaClient.createdTopics);
-        assertEquals(Collections.singletonMap(topic, 1), streamsKafkaClient.numberOfPartitionsPerTopic);
-        assertEquals(Collections.singletonMap(topic, 1), streamsKafkaClient.replicationFactorPerTopic);
+        try {
+            internalTopicManager.getNumPartitions(Collections.singleton(topic));
+            fail("Should have thrown StreamsException.");
+        } catch (final StreamsException expected) {
+            assertNull(expected.getCause());
+            assertEquals("Could not get number of partitions from brokers. This can happen
if the Kafka cluster is temporary not available. You can increase admin client config `retries`
to be resilient against this error.", expected.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldCreateRequiredTopics() throws Exception {
+        final InternalTopicConfig topicConfig = new InternalTopicConfig(topic,  Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
Collections.<String, String>emptyMap());
+        topicConfig.setNumberOfPartitions(1);
+        internalTopicManager.makeReady(Collections.singletonMap(topic, topicConfig));
+
+        assertEquals(Collections.singleton(topic), mockAdminClient.listTopics().names().get());
+        assertEquals(new TopicDescription(topic, false, new ArrayList<TopicPartitionInfo>()
{
+            {
+                add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
+            }
+        }), mockAdminClient.describeTopics(Collections.singleton(topic)).values().get(topic).get());
     }
 
     @Test
     public void shouldNotCreateTopicIfExistsWithDifferentPartitions() {
-        final InternalTopicManager internalTopicManager = new InternalTopicManager(
-            streamsKafkaClient,
-            1,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
-            time);
+        mockAdminClient.addTopic(
+            false,
+            topic,
+            new ArrayList<TopicPartitionInfo>() {
+                {
+                    add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
+                    add(new TopicPartitionInfo(1, broker1, singleReplica, Collections.<Node>emptyList()));
+                }
+            },
+            null);
+
         try {
-            internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic,
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 2));
-            Assert.fail("Should have thrown StreamsException");
+            final InternalTopicConfig internalTopicConfig = new InternalTopicConfig(topic,
Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), Collections.<String, String>emptyMap());
+            internalTopicConfig.setNumberOfPartitions(1);
+            internalTopicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig));
+            fail("Should have thrown StreamsException");
         } catch (StreamsException expected) { /* pass */ }
     }
 
     @Test
     public void shouldNotThrowExceptionIfExistsWithDifferentReplication() {
-
-        // create topic the first time with replication 2
-        final InternalTopicManager internalTopicManager = new InternalTopicManager(
-            streamsKafkaClient,
-            2,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
-            time);
-        internalTopicManager.makeReady(Collections.singletonMap(
-            new InternalTopicConfig(topic,
-                                    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                    null),
-            1));
+        mockAdminClient.addTopic(
+            false,
+            topic,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.<Node>emptyList())),
+            null);
 
         // attempt to create it again with replication 1
         final InternalTopicManager internalTopicManager2 = new InternalTopicManager(
-            streamsKafkaClient,
-            1,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
-            time);
-
-        internalTopicManager2.makeReady(Collections.singletonMap(
-            new InternalTopicConfig(topic,
-                                    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                   null),
-            1));
+            mockAdminClient,
+            config);
+
+        final InternalTopicConfig internalTopicConfig = new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
Collections.<String, String>emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        internalTopicManager2.makeReady(Collections.singletonMap(topic, internalTopicConfig));
     }
 
     @Test
     public void shouldNotThrowExceptionForEmptyTopicMap() {
-        final InternalTopicManager internalTopicManager = new InternalTopicManager(
-            streamsKafkaClient,
-            1,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
-            time);
-
-        internalTopicManager.makeReady(Collections.<InternalTopicConfig, Integer>emptyMap());
-    }
-
-    private Properties configProps() {
-        return new Properties() {
-            {
-                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "Internal-Topic-ManagerTest");
-                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
-                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
-            }
-        };
+        internalTopicManager.makeReady(Collections.<String, InternalTopicConfig>emptyMap());
     }
 
-    private class MockStreamKafkaClient extends StreamsKafkaClient {
-
-        boolean returnNoMetadata = false;
-
-        Map<String, InternalTopicConfig> createdTopics = new HashMap<>();
-        Map<String, Integer> numberOfPartitionsPerTopic = new HashMap<>();
-        Map<String, Integer> replicationFactorPerTopic = new HashMap<>();
-
-        MockStreamKafkaClient(final StreamsConfig streamsConfig) {
-            super(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig.originals()),
-                  new MockClient(new MockTime()),
-                  Collections.<MetricsReporter>emptyList(),
-                  new LogContext());
-        }
-
-        @Override
-        public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
-                                 final int replicationFactor,
-                                 final long windowChangeLogAdditionalRetention,
-                                 final MetadataResponse metadata) {
-            for (final Map.Entry<InternalTopicConfig, Integer> topic : topicsMap.entrySet())
{
-                final InternalTopicConfig config = topic.getKey();
-                final String topicName = config.name();
-                createdTopics.put(topicName, config);
-                numberOfPartitionsPerTopic.put(topicName, topic.getValue());
-                replicationFactorPerTopic.put(topicName, replicationFactor);
-            }
-        }
-
-        @Override
-        public MetadataResponse fetchMetadata() {
-            final Node node = new Node(1, "host1", 1001);
-            final MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
1, node, new ArrayList<Node>(), new ArrayList<Node>(), new ArrayList<Node>());
-            final MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
topic, true, Collections.singletonList(partitionMetadata));
-            final MetadataResponse metadataResponse;
-            if (returnNoMetadata) {
-                metadataResponse = new MetadataResponse(
-                    Collections.<Node>singletonList(node),
-                    null,
-                    MetadataResponse.NO_CONTROLLER_ID,
-                    Collections.<MetadataResponse.TopicMetadata>emptyList());
-            } else {
-                metadataResponse = new MetadataResponse(
-                    Collections.<Node>singletonList(node),
-                    null,
-                    MetadataResponse.NO_CONTROLLER_ID,
-                    Collections.singletonList(topicMetadata));
-            }
+    @Test
+    public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
+        mockAdminClient.timeoutNextRequest(4);
 
-            return metadataResponse;
+        final InternalTopicConfig internalTopicConfig = new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
Collections.<String, String>emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        try {
+            internalTopicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig));
+            fail("Should have thrown StreamsException.");
+        } catch (final StreamsException expected) {
+            assertNull(expected.getCause());
+            assertEquals("Could not create topics. This can happen if the Kafka cluster is
temporary not available. You can increase admin client config `retries` to be resilient against
this error.", expected.getMessage());
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 99bb56d..d9e0ee7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -105,7 +105,7 @@ public class StreamPartitionAssignorTest {
     private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
     private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
     private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
-    private final StreamsConfig config = new StreamsConfig(configProps());
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
     private final String userEndPoint = "localhost:8080";
     private final String applicationId = "stream-partition-assignor-test";
 
@@ -136,7 +136,6 @@ public class StreamPartitionAssignorTest {
         EasyMock.replay(taskManager);
     }
 
-
     @Test
     public void testSubscription() throws Exception {
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -165,7 +164,6 @@ public class StreamPartitionAssignorTest {
         assertEquals(info.encode(), subscription.userData());
     }
 
-
     @Test
     public void testAssignBasic() throws Exception {
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -187,7 +185,7 @@ public class StreamPartitionAssignorTest {
         mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder);
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -246,7 +244,7 @@ public class StreamPartitionAssignorTest {
         mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
uuid1, builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG,
(Object) SingleGroupPartitionGrouperStub.class));
 
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(new StreamsConfig(configProps()),
mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer));
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
             new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(),
Collections.<TaskId>emptySet(), userEndPoint).encode()));
@@ -339,7 +337,7 @@ public class StreamPartitionAssignorTest {
         mockTaskManager(prevTasks10, Collections.<TaskId>emptySet(), uuid1, builder);
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -406,7 +404,7 @@ public class StreamPartitionAssignorTest {
                 builder);
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -466,7 +464,7 @@ public class StreamPartitionAssignorTest {
     public void testAssignWithStandbyReplicas() throws Exception {
         Map<String, Object> props = configProps();
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
-        StreamsConfig config = new StreamsConfig(props);
+        StreamsConfig streamsConfig = new StreamsConfig(props);
 
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
@@ -489,7 +487,7 @@ public class StreamPartitionAssignorTest {
 
         configurePartitionAssignor(Collections.<String, Object>singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
1));
 
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -584,7 +582,7 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
uuid1, builder);
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
-        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(config,
mockClientSupplier.restoreConsumer);
+        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
@@ -619,7 +617,7 @@ public class StreamPartitionAssignorTest {
         mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
uuid1, builder);
 
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
-        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(config,
mockClientSupplier.restoreConsumer);
+        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
@@ -666,7 +664,7 @@ public class StreamPartitionAssignorTest {
         mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
uuid1, builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
(Object) userEndPoint));
 
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         final Set<TaskId> emptyTasks = Collections.emptySet();
@@ -687,7 +685,7 @@ public class StreamPartitionAssignorTest {
         builder.setApplicationId(applicationId);
 
         mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
UUID.randomUUID(), builder);
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer));
 
         try {
             configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
(Object) "localhost"));
@@ -779,7 +777,7 @@ public class StreamPartitionAssignorTest {
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
         final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
-            config,
+            streamsConfig,
             mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 
@@ -849,7 +847,7 @@ public class StreamPartitionAssignorTest {
         props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint);
         configurePartitionAssignor(props);
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(
-            config,
+            streamsConfig,
             mockClientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index daf6fad..69ae07c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -249,7 +249,6 @@ public class StreamThreadTest {
                 consumer,
                 consumer,
                 null,
-                clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
                 taskManager,
                 streamsMetrics,
                 internalTopologyBuilder,
@@ -281,7 +280,6 @@ public class StreamThreadTest {
                 consumer,
                 consumer,
                 null,
-                clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
                 taskManager,
                 streamsMetrics,
                 internalTopologyBuilder,
@@ -313,7 +311,6 @@ public class StreamThreadTest {
                 consumer,
                 consumer,
                 null,
-                clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
                 taskManager,
                 streamsMetrics,
                 internalTopologyBuilder,
@@ -443,7 +440,6 @@ public class StreamThreadTest {
                 consumer,
                 consumer,
                 null,
-                clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
                 taskManager,
                 streamsMetrics,
                 internalTopologyBuilder,

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
deleted file mode 100644
index 660a622..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.metrics.KafkaMetric;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.requests.CreateTopicsRequest;
-import org.apache.kafka.common.requests.CreateTopicsResponse;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.ProduceResponse;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Arrays.asList;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-
-public class StreamsKafkaClientTest {
-
-    private static final String TOPIC = "topic";
-    private final MockClient kafkaClient = new MockClient(new MockTime());
-    private final List<MetricsReporter> reporters = Collections.emptyList();
-    private final MetadataResponse metadata = new MetadataResponse(Collections.singletonList(new
Node(1, "host", 90)), "cluster", 1, Collections.<MetadataResponse.TopicMetadata>emptyList());
-    private final Map<String, Object> config = new HashMap<>();
-    private final InternalTopicConfig topicConfigWithNoOverrides = new InternalTopicConfig(TOPIC,
-                                                                                        
  Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
-                                                                                        
  Collections.<String, String>emptyMap());
-
-    private final Map<String, String> overridenTopicConfig = Collections.singletonMap(TopicConfig.DELETE_RETENTION_MS_CONFIG,
"100");
-    private final InternalTopicConfig topicConfigWithOverrides = new InternalTopicConfig(TOPIC,
-                                                                                        
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                                                                                        
overridenTopicConfig);
-
-
-    @Before
-    public void before() {
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "some_app_id");
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
-    }
-
-    @Test
-    public void testConfigFromStreamsConfig() {
-        for (final String expectedMechanism : asList("PLAIN", "SCRAM-SHA-512")) {
-            config.put(SaslConfigs.SASL_MECHANISM, expectedMechanism);
-            final AbstractConfig abstractConfig = StreamsKafkaClient.Config.fromStreamsConfig(config);
-            assertEquals(expectedMechanism, abstractConfig.values().get(SaslConfigs.SASL_MECHANISM));
-            assertEquals(expectedMechanism, abstractConfig.getString(SaslConfigs.SASL_MECHANISM));
-        }
-    }
-
-    @Test
-    public void shouldAddCleanupPolicyToTopicConfigWhenCreatingTopic() throws Exception {
-        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
-        verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy",
"delete"));
-    }
-
-
-    @Test
-    public void shouldAddDefaultTopicConfigFromStreamConfig() throws Exception {
-        config.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), "100");
-        config.put(StreamsConfig.topicPrefix(TopicConfig.COMPRESSION_TYPE_CONFIG), "gzip");
-
-        final Map<String, String> expectedConfigs = new HashMap<>();
-        expectedConfigs.put(TopicConfig.SEGMENT_MS_CONFIG, "100");
-        expectedConfigs.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip");
-        expectedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
-        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
-        verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithNoOverrides, expectedConfigs);
-    }
-
-    @Test
-    public void shouldSetPropertiesDefinedByInternalTopicConfig() throws Exception {
-        final Map<String, String> expectedConfigs = new HashMap<>(overridenTopicConfig);
-        expectedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
-        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
-        verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithOverrides, expectedConfigs);
-    }
-
-    @Test
-    public void shouldOverrideDefaultTopicConfigsFromStreamsConfig() throws Exception {
-        config.put(StreamsConfig.topicPrefix(TopicConfig.DELETE_RETENTION_MS_CONFIG), "99999");
-        config.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), "988");
-
-        final Map<String, String> expectedConfigs = new HashMap<>(overridenTopicConfig);
-        expectedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
-        expectedConfigs.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "100");
-        expectedConfigs.put(TopicConfig.SEGMENT_MS_CONFIG, "988");
-        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
-        verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithOverrides, expectedConfigs);
-    }
-
-    @Test
-    public void shouldNotAllowNullTopicConfigs() throws Exception {
-        config.put(StreamsConfig.topicPrefix(TopicConfig.DELETE_RETENTION_MS_CONFIG), null);
-        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
-        verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy",
"delete"));
-    }
-
-    @Test
-    public void metricsShouldBeTaggedWithClientId() {
-        config.put(StreamsConfig.CLIENT_ID_CONFIG, "some_client_id");
-        config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, TestMetricsReporter.class.getName());
-        StreamsKafkaClient.create(config);
-        assertFalse(TestMetricsReporter.METRICS.isEmpty());
-        for (KafkaMetric kafkaMetric : TestMetricsReporter.METRICS.values()) {
-            assertEquals("some_client_id", kafkaMetric.metricName().tags().get("client-id"));
-        }
-    }
-
-    @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionOnEmptyFetchMetadataResponse() {
-        kafkaClient.prepareResponse(null);
-        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
-        streamsKafkaClient.fetchMetadata();
-    }
-
-    @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionWhenFetchMetadataResponseInconsistent() {
-        kafkaClient.prepareResponse(new ProduceResponse(Collections.<TopicPartition, ProduceResponse.PartitionResponse>emptyMap()));
-        final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient();
-        streamsKafkaClient.fetchMetadata();
-    }
-
-    private void verifyCorrectTopicConfigs(final StreamsKafkaClient streamsKafkaClient,
-                                           final InternalTopicConfig internalTopicConfig,
-                                           final Map<String, String> expectedConfigs)
{
-        final Map<String, String> requestedTopicConfigs = new HashMap<>();
-
-        kafkaClient.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(final AbstractRequest body) {
-                if (!(body instanceof CreateTopicsRequest)) {
-                    return false;
-                }
-                final CreateTopicsRequest request = (CreateTopicsRequest) body;
-                final Map<String, CreateTopicsRequest.TopicDetails> topics =
-                        request.topics();
-                final CreateTopicsRequest.TopicDetails topicDetails = topics.get(TOPIC);
-                requestedTopicConfigs.putAll(topicDetails.configs);
-                return true;
-            }
-        }, new CreateTopicsResponse(Collections.singletonMap(TOPIC, ApiError.NONE)));
-
-        streamsKafkaClient.createTopics(Collections.singletonMap(internalTopicConfig, 1),
1, 1, metadata);
-
-        assertThat(requestedTopicConfigs, equalTo(expectedConfigs));
-    }
-
-    private StreamsKafkaClient createStreamsKafkaClient() {
-        return new StreamsKafkaClient(StreamsKafkaClient.Config.fromStreamsConfig(config),
-                                      kafkaClient,
-                                      reporters,
-                                      new LogContext());
-    }
-
-
-    public static class TestMetricsReporter implements MetricsReporter {
-        static final Map<MetricName, KafkaMetric> METRICS = new HashMap<>();
-
-        @Override
-        public void configure(final Map<String, ?> configs) { }
-
-        @Override
-        public void init(final List<KafkaMetric> metrics) {
-            for (final KafkaMetric metric : metrics) {
-                metricChange(metric);
-            }
-        }
-
-        @Override
-        public void metricChange(final KafkaMetric metric) {
-            METRICS.put(metric.metricName(), metric);
-        }
-
-        @Override
-        public void metricRemoval(final KafkaMetric metric) {
-            METRICS.remove(metric.metricName());
-        }
-
-        @Override
-        public void close() {
-            METRICS.clear();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 37a683c..648e9b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
-
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
@@ -88,8 +87,6 @@ public class TaskManagerTest {
     @Mock(type = MockType.NICE)
     private StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
     @Mock(type = MockType.NICE)
-    private StreamsKafkaClient streamsKafkaClient;
-    @Mock(type = MockType.NICE)
     private AdminClient adminClient;
     @Mock(type = MockType.NICE)
     private StreamTask streamTask;
@@ -128,7 +125,6 @@ public class TaskManagerTest {
                                       streamsMetadataState,
                                       activeTaskCreator,
                                       standbyTaskCreator,
-                                      streamsKafkaClient,
                                       adminClient,
                                       active,
                                       standby);
@@ -697,4 +693,4 @@ public class TaskManagerTest {
         expect(topologyBuilder.sourceTopicPattern()).andReturn(Pattern.compile("abc"));
         expect(topologyBuilder.subscriptionUpdates()).andReturn(subscriptionUpdates);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index ef1f63c..b308782 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -94,6 +94,7 @@ public class BrokerCompatibilityTest {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {
                 System.err.println("FATAL: An unexpected exception " + e);
+                e.printStackTrace(System.err);
                 System.err.flush();
                 streams.close(30, TimeUnit.SECONDS);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
index 598ca8d..3db7e53 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
@@ -16,13 +16,12 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
 import org.apache.kafka.streams.processor.internals.InternalTopicManager;
-import org.apache.kafka.streams.processor.internals.StreamsKafkaClient;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,26 +33,29 @@ import java.util.Set;
 
 public class MockInternalTopicManager extends InternalTopicManager {
 
-    public Map<String, Integer> readyTopics = new HashMap<>();
-    private MockConsumer<byte[], byte[]> restoreConsumer;
+    final public Map<String, Integer> readyTopics = new HashMap<>();
+    final private MockConsumer<byte[], byte[]> restoreConsumer;
 
-    public MockInternalTopicManager(StreamsConfig streamsConfig, MockConsumer<byte[],
byte[]> restoreConsumer) {
-        super(StreamsKafkaClient.create(streamsConfig.originals()), 0, 0, new MockTime());
+    public MockInternalTopicManager(final StreamsConfig streamsConfig,
+                                    final MockConsumer<byte[], byte[]> restoreConsumer)
{
+        super(KafkaAdminClient.create(streamsConfig.originals()), streamsConfig.originals());
 
         this.restoreConsumer = restoreConsumer;
     }
 
     @Override
-    public void makeReady(final Map<InternalTopicConfig, Integer> topics) {
-        for (Map.Entry<InternalTopicConfig, Integer> entry : topics.entrySet()) {
-            readyTopics.put(entry.getKey().name(), entry.getValue());
+    public void makeReady(final Map<String, InternalTopicConfig> topics) {
+        for (final InternalTopicConfig topic : topics.values()) {
+            final String topicName = topic.name();
+            final int numberOfPartitions = topic.numberOfPartitions();
+            readyTopics.put(topicName, numberOfPartitions);
 
             final List<PartitionInfo> partitions = new ArrayList<>();
-            for (int i = 0; i < entry.getValue(); i++) {
-                partitions.add(new PartitionInfo(entry.getKey().name(), i, null, null, null));
+            for (int i = 0; i < numberOfPartitions; i++) {
+                partitions.add(new PartitionInfo(topicName, i, null, null, null));
             }
 
-            restoreConsumer.updatePartitions(entry.getKey().name(), partitions);
+            restoreConsumer.updatePartitions(topicName, partitions);
         }
     }
 
@@ -66,4 +68,4 @@ public class MockInternalTopicManager extends InternalTopicManager {
 
         return partitions;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/234ec8a8/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index 92a8c1e..1eb46ef 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -102,9 +102,9 @@ class StreamsBrokerCompatibility(Test):
 
         processor.node.account.ssh(processor.start_cmd(processor.node))
         with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
-            monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.streams.errors.StreamsException:
Could not create internal topics.',
+            monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.streams.errors.StreamsException:
Could not create topic kafka-streams-system-test-broker-compatibility-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog.',
                         timeout_sec=60,
-                        err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.streams.errors.StreamsException:
Could not create internal topics.' error message " + str(processor.node.account))
+                        err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.streams.errors.StreamsException:
Could not create topic kafka-streams-system-test-broker-compatibility-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog.'
error message " + str(processor.node.account))
 
         self.kafka.stop()
 


Mime
View raw message