This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5c24295 Trogdor's ProducerBench does not fail if topics exists (#4673) 5c24295 is described below commit 5c24295d44b4f015a620914992542c4d2083c611 Author: Anna Povzner AuthorDate: Tue Mar 20 06:51:45 2018 -0700 Trogdor's ProducerBench does not fail if topics exists (#4673) Added configs to ProducerBenchSpec: topicPrefix: name of topics will be of format topicPrefix + topic index. If not provided, default is "produceBenchTopic". partitionsPerTopic: number of partitions per topic. If not provided, default is 1. replicationFactor: replication factor per topic. If not provided, default is 3. The behavior of producer bench is changed such that if some or all topics already exist (with topic names = topicPrefix + topic index), and they have the same number of partitions as requested, the worker uses those topics and does not fail. The producer bench fails if one or more existing topics has number of partitions that is different from expected number of partitions. Added unit test for WorkerUtils -- for existing methods and new methods. Fixed bug in MockAdminClient, where createTopics() would over-write existing topic's replication factor and number of partitions while correctly completing the appropriate futures exceptionally with TopicExistsException. Reviewers: Colin P. Mccabe , Rajini Sivaram --- checkstyle/suppressions.xml | 2 + .../kafka/clients/admin/MockAdminClient.java | 1 + .../apache/kafka/trogdor/common/WorkerUtils.java | 194 ++++++++++++++----- .../kafka/trogdor/workload/ProduceBenchSpec.java | 33 +++- .../kafka/trogdor/workload/ProduceBenchWorker.java | 21 +- .../kafka/trogdor/workload/RoundTripWorker.java | 7 +- .../trogdor/common/JsonSerializationTest.java | 2 +- .../kafka/trogdor/common/WorkerUtilsTest.java | 211 +++++++++++++++++++++ 8 files changed, 405 insertions(+), 66 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 62fe4ed..e3bf151 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -212,6 +212,8 @@ files="SignalLogger.java"/> + replicas = new ArrayList<>(replicationFactor); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index 58f8278..99c13c0 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -19,9 +19,14 @@ package org.apache.kafka.trogdor.common; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeTopicsOptions; +import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -33,7 +38,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** @@ -71,7 +75,7 @@ public final class WorkerUtils { } private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000; - private static final int CREATE_TOPICS_CALL_TIMEOUT = 90000; + private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000; private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10; //Map>> topics) throws Throwable { @@ -82,61 +86,151 @@ public final class WorkerUtils { * @param log The logger to use. * @param bootstrapServers The bootstrap server list. * @param topics Maps topic names to partition assignments. + * @param failOnExisting If true, the method will throw TopicExistsException if one or + * more topics already exist. Otherwise, the existing topics are + * verified for number of partitions. In this case, if number of + * partitions of an existing topic does not match the requested + * number of partitions, the method throws RuntimeException. */ - public static void createTopics(Logger log, String bootstrapServers, - Collection topics) throws Throwable { - Properties props = new Properties(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT); - try (AdminClient adminClient = AdminClient.create(props)) { - long startMs = Time.SYSTEM.milliseconds(); - int tries = 0; + public static void createTopics( + Logger log, String bootstrapServers, + Map topics, boolean failOnExisting) throws Throwable { + // this method wraps the call to createTopics() that takes admin client, so that we can + // unit test the functionality with MockAdminClient. The exception is caught and + // re-thrown so that admin client is closed when the method returns. + try (AdminClient adminClient = createAdminClient(bootstrapServers)) { + createTopics(log, adminClient, topics, failOnExisting); + } catch (Exception e) { + log.warn("Failed to create or verify topics {}", topics, e); + throw e; + } + } + + /** + * The actual create topics functionality is separated into this method and called from the + * above method to be able to unit test with mock adminClient. + */ + static void createTopics( + Logger log, AdminClient adminClient, + Map topics, boolean failOnExisting) throws Throwable { + if (topics.isEmpty()) { + log.warn("Request to create topics has an empty topic list."); + return; + } - Map newTopics = new HashMap<>(); - for (NewTopic newTopic : topics) { - newTopics.put(newTopic.name(), newTopic); + Collection topicsExists = createTopics(log, adminClient, topics.values()); + if (!topicsExists.isEmpty()) { + if (failOnExisting) { + log.warn("Topic(s) {} already exist.", topicsExists); + throw new TopicExistsException("One or more topics already exist."); + } else { + verifyTopics(log, adminClient, topicsExists, topics); } - List topicsToCreate = new ArrayList<>(newTopics.keySet()); - while (true) { - log.info("Attemping to create {} topics (try {})...", topicsToCreate.size(), ++tries); - Map> creations = new HashMap<>(); - while (!topicsToCreate.isEmpty()) { - List newTopicsBatch = new ArrayList<>(); - for (int i = 0; (i < MAX_CREATE_TOPICS_BATCH_SIZE) && - !topicsToCreate.isEmpty(); i++) { - String topicName = topicsToCreate.remove(0); - newTopicsBatch.add(newTopics.get(topicName)); - } - creations.putAll(adminClient.createTopics(newTopicsBatch).values()); + } + } + + /** + * Creates Kafka topics and returns a list of topics that already exist + * @param log The logger to use + * @param adminClient AdminClient + * @param topics List of topics to create + * @return Collection of topics names that already exist. + * @throws Throwable if creation of one or more topics fails (except for topic exists case). + */ + private static Collection createTopics(Logger log, AdminClient adminClient, + Collection topics) throws Throwable { + long startMs = Time.SYSTEM.milliseconds(); + int tries = 0; + List existingTopics = new ArrayList<>(); + + Map newTopics = new HashMap<>(); + for (NewTopic newTopic : topics) { + newTopics.put(newTopic.name(), newTopic); + } + List topicsToCreate = new ArrayList<>(newTopics.keySet()); + while (true) { + log.info("Attempting to create {} topics (try {})...", topicsToCreate.size(), ++tries); + Map> creations = new HashMap<>(); + while (!topicsToCreate.isEmpty()) { + List newTopicsBatch = new ArrayList<>(); + for (int i = 0; (i < MAX_CREATE_TOPICS_BATCH_SIZE) && + !topicsToCreate.isEmpty(); i++) { + String topicName = topicsToCreate.remove(0); + newTopicsBatch.add(newTopics.get(topicName)); } - // We retry cases where the topic creation failed with a - // timeout. This is a workaround for KAFKA-6368. - for (Map.Entry> entry : creations.entrySet()) { - String topicName = entry.getKey(); - Future future = entry.getValue(); - try { - future.get(); - log.debug("Successfully created {}.", topicName); - } catch (ExecutionException e) { - if (e.getCause() instanceof TimeoutException) { - log.warn("Timed out attempting to create {}: {}", topicName, e.getCause().getMessage()); - topicsToCreate.add(topicName); - } else { - log.warn("Failed to create {}", topicName, e.getCause()); - throw e.getCause(); - } + creations.putAll(adminClient.createTopics(newTopicsBatch).values()); + } + // We retry cases where the topic creation failed with a + // timeout. This is a workaround for KAFKA-6368. + for (Map.Entry> entry : creations.entrySet()) { + String topicName = entry.getKey(); + Future future = entry.getValue(); + try { + future.get(); + log.debug("Successfully created {}.", topicName); + } catch (Exception e) { + if ((e.getCause() instanceof TimeoutException) + || (e.getCause() instanceof NotEnoughReplicasException)) { + log.warn("Attempt to create topic `{}` failed: {}", topicName, + e.getCause().getMessage()); + topicsToCreate.add(topicName); + } else if (e.getCause() instanceof TopicExistsException) { + log.info("Topic {} already exists.", topicName); + existingTopics.add(topicName); + } else { + log.warn("Failed to create {}", topicName, e.getCause()); + throw e.getCause(); } } - if (topicsToCreate.isEmpty()) { - break; - } - if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) { - String str = "Unable to create topic(s): " + - Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)"; - log.warn(str); - throw new TimeoutException(str); - } + } + if (topicsToCreate.isEmpty()) { + break; + } + if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) { + String str = "Unable to create topic(s): " + + Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)"; + log.warn(str); + throw new TimeoutException(str); + } + } + return existingTopics; + } + + /** + * Verifies that topics in 'topicsToVerify' list have the same number of partitions as + * described in 'topicsInfo' + * @param log The logger to use + * @param adminClient AdminClient + * @param topicsToVerify List of topics to verify + * @param topicsInfo Map of topic name to topic description, which includes topics in + * 'topicsToVerify' list. + * @throws RuntimeException If one or more topics have different number of partitions than + * described in 'topicsInfo' + */ + private static void verifyTopics( + Logger log, AdminClient adminClient, + Collection topicsToVerify, Map topicsInfo) throws Throwable { + DescribeTopicsResult topicsResult = adminClient.describeTopics( + topicsToVerify, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT)); + Map topicDescriptionMap = topicsResult.all().get(); + for (TopicDescription desc: topicDescriptionMap.values()) { + // map will always contain the topic since all topics in 'topicsExists' are in given + // 'topics' map + int partitions = topicsInfo.get(desc.name()).numPartitions(); + if (desc.partitions().size() != partitions) { + String str = "Topic '" + desc.name() + "' exists, but has " + + desc.partitions().size() + " partitions, while requested " + + " number of partitions is " + partitions; + log.warn(str); + throw new RuntimeException(str); } } } + + private static AdminClient createAdminClient(String bootstrapServers) { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT); + return AdminClient.create(props); + } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java index a798e73..7b1bedd 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java @@ -33,6 +33,11 @@ import java.util.Set; * The specification for a benchmark that produces messages to a set of topics. */ public class ProduceBenchSpec extends TaskSpec { + + private static final String DEFAULT_TOPIC_PREFIX = "produceBenchTopic"; + private static final int DEFAULT_NUM_PARTITIONS = 1; + private static final short DEFAULT_REPLICATION_FACTOR = 3; + private final String producerNode; private final String bootstrapServers; private final int targetMessagesPerSec; @@ -42,6 +47,9 @@ public class ProduceBenchSpec extends TaskSpec { private final Map producerConf; private final int totalTopics; private final int activeTopics; + private final String topicPrefix; + private final int numPartitions; + private final short replicationFactor; @JsonCreator public ProduceBenchSpec(@JsonProperty("startMs") long startMs, @@ -54,7 +62,10 @@ public class ProduceBenchSpec extends TaskSpec { @JsonProperty("valueGenerator") PayloadGenerator valueGenerator, @JsonProperty("producerConf") Map producerConf, @JsonProperty("totalTopics") int totalTopics, - @JsonProperty("activeTopics") int activeTopics) { + @JsonProperty("activeTopics") int activeTopics, + @JsonProperty("topicPrefix") String topicPrefix, + @JsonProperty("partitionsPerTopic") int partitionsPerTopic, + @JsonProperty("replicationFactor") short replicationFactor) { super(startMs, durationMs); this.producerNode = (producerNode == null) ? "" : producerNode; this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; @@ -67,6 +78,11 @@ public class ProduceBenchSpec extends TaskSpec { this.producerConf = (producerConf == null) ? new TreeMap() : producerConf; this.totalTopics = totalTopics; this.activeTopics = activeTopics; + this.topicPrefix = (topicPrefix == null) ? DEFAULT_TOPIC_PREFIX : topicPrefix; + this.numPartitions = (partitionsPerTopic == 0) + ? DEFAULT_NUM_PARTITIONS : partitionsPerTopic; + this.replicationFactor = (replicationFactor == 0) + ? DEFAULT_REPLICATION_FACTOR : replicationFactor; } @JsonProperty @@ -114,6 +130,21 @@ public class ProduceBenchSpec extends TaskSpec { return activeTopics; } + @JsonProperty + public String topicPrefix() { + return topicPrefix; + } + + @JsonProperty + public int numPartitions() { + return numPartitions; + } + + @JsonProperty + public short replicationFactor() { + return replicationFactor; + } + @Override public TaskController newController(String id) { return new TaskController() { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java index 51f52d3..e291bae 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java @@ -37,8 +37,7 @@ import org.apache.kafka.trogdor.task.TaskWorker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; @@ -51,11 +50,7 @@ import java.util.concurrent.atomic.AtomicReference; public class ProduceBenchWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class); - - private static final short NUM_PARTITIONS = 1; - - private static final short REPLICATION_FACTOR = 3; - + private static final int THROTTLE_PERIOD_MS = 100; private final String id; @@ -76,8 +71,8 @@ public class ProduceBenchWorker implements TaskWorker { * @param topicIndex The topic number. * @return The topic name. */ - public static String topicIndexToName(int topicIndex) { - return String.format("topic%05d", topicIndex); + public String topicIndexToName(int topicIndex) { + return String.format("%s%05d", spec.topicPrefix(), topicIndex); } public ProduceBenchWorker(String id, ProduceBenchSpec spec) { @@ -111,11 +106,13 @@ public class ProduceBenchWorker implements TaskWorker { "activeTopics was %d, but totalTopics was only %d. activeTopics must " + "be less than or equal to totalTopics.", spec.activeTopics(), spec.totalTopics())); } - List newTopics = new ArrayList<>(); + Map newTopics = new HashMap<>(); for (int i = 0; i < spec.totalTopics(); i++) { - newTopics.add(new NewTopic(topicIndexToName(i), NUM_PARTITIONS, REPLICATION_FACTOR)); + String name = topicIndexToName(i); + newTopics.put(name, new NewTopic(name, spec.numPartitions(), spec.replicationFactor())); } - WorkerUtils.createTopics(log, spec.bootstrapServers(), newTopics); + WorkerUtils.createTopics(log, spec.bootstrapServers(), newTopics, false); + executor.submit(new SendRecords()); } catch (Throwable e) { WorkerUtils.abort(log, "Prepare", e, doneFuture); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index 1b9cb8f..a05785c 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -123,8 +123,11 @@ public class RoundTripWorker implements TaskWorker { if ((spec.partitionAssignments() == null) || spec.partitionAssignments().isEmpty()) { throw new ConfigException("Invalid null or empty partitionAssignments."); } - WorkerUtils.createTopics(log, spec.bootstrapServers(), - Collections.singletonList(new NewTopic(TOPIC_NAME, spec.partitionAssignments()))); + WorkerUtils.createTopics( + log, spec.bootstrapServers(), + Collections.singletonMap(TOPIC_NAME, + new NewTopic(TOPIC_NAME, spec.partitionAssignments())), + true); executor.submit(new ProducerRunnable()); executor.submit(new ConsumerRunnable()); } catch (Throwable e) { diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java index 77a7932..dee7614 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java @@ -49,7 +49,7 @@ public class JsonSerializationTest { verify(new WorkerRunning(null, 0, null)); verify(new WorkerStopping(null, 0, null)); verify(new ProduceBenchSpec(0, 0, null, null, - 0, 0, null, null, null, 0, 0)); + 0, 0, null, null, null, 0, 0, "test-topic", 1, (short) 3)); verify(new RoundTripWorkloadSpec(0, 0, null, null, 0, null, null, 0)); verify(new SampleTaskSpec(0, 0, 0, null)); diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java new file mode 100644 index 0000000..22b7846 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.common; + + + +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartitionInfo; + +import org.apache.kafka.common.Node; +import org.apache.kafka.clients.admin.MockAdminClient; + +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kafka.clients.admin.NewTopic; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class WorkerUtilsTest { + + private static final Logger log = LoggerFactory.getLogger(WorkerUtilsTest.class); + + private final Node broker1 = new Node(0, "testHost-1", 1234); + private final Node broker2 = new Node(1, "testHost-2", 1234); + private final Node broker3 = new Node(1, "testHost-3", 1234); + private final List cluster = Arrays.asList(broker1, broker2, broker3); + private final List singleReplica = Collections.singletonList(broker1); + + private static final String TEST_TOPIC = "test-topic-1"; + private static final short TEST_REPLICATION_FACTOR = 1; + private static final int TEST_PARTITIONS = 1; + private static final NewTopic NEW_TEST_TOPIC = + new NewTopic(TEST_TOPIC, TEST_PARTITIONS, TEST_REPLICATION_FACTOR); + + private MockAdminClient adminClient; + + + @Before + public void setUp() throws Exception { + adminClient = new MockAdminClient(cluster, broker1); + } + + @Test + public void testCreateOneTopic() throws Throwable { + Map newTopics = Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC); + + WorkerUtils.createTopics(log, adminClient, newTopics, true); + assertEquals(Collections.singleton(TEST_TOPIC), adminClient.listTopics().names().get()); + assertEquals( + new TopicDescription( + TEST_TOPIC, false, + Collections.singletonList( + new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))), + adminClient.describeTopics( + Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get() + ); + } + + @Test + public void testCreateRetriesOnTimeout() throws Throwable { + adminClient.timeoutNextRequest(1); + + WorkerUtils.createTopics( + log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), true); + + assertEquals( + new TopicDescription( + TEST_TOPIC, false, + Collections.singletonList( + new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))), + adminClient.describeTopics( + Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get() + ); + } + + @Test + public void testCreateZeroTopicsDoesNothing() throws Throwable { + WorkerUtils.createTopics(log, adminClient, Collections.emptyMap(), true); + assertEquals(0, adminClient.listTopics().names().get().size()); + } + + @Test(expected = TopicExistsException.class) + public void testCreateTopicsFailsIfAtLeastOneTopicExists() throws Throwable { + adminClient.addTopic( + false, + TEST_TOPIC, + Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())), + null); + + Map newTopics = new HashMap<>(); + newTopics.put(TEST_TOPIC, NEW_TEST_TOPIC); + newTopics.put("another-topic", + new NewTopic("another-topic", TEST_PARTITIONS, TEST_REPLICATION_FACTOR)); + newTopics.put("one-more-topic", + new NewTopic("one-more-topic", TEST_PARTITIONS, TEST_REPLICATION_FACTOR)); + + WorkerUtils.createTopics(log, adminClient, newTopics, true); + } + + @Test(expected = RuntimeException.class) + public void testExistingTopicsMustHaveRequestedNumberOfPartitions() throws Throwable { + List tpInfo = new ArrayList<>(); + tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); + tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.emptyList())); + adminClient.addTopic( + false, + TEST_TOPIC, + tpInfo, + null); + + WorkerUtils.createTopics( + log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), false); + } + + @Test + public void testExistingTopicsNotCreated() throws Throwable { + final String existingTopic = "existing-topic"; + List tpInfo = new ArrayList<>(); + tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); + tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.emptyList())); + tpInfo.add(new TopicPartitionInfo(2, broker3, singleReplica, Collections.emptyList())); + adminClient.addTopic( + false, + existingTopic, + tpInfo, + null); + + WorkerUtils.createTopics( + log, adminClient, + Collections.singletonMap( + existingTopic, + new NewTopic(existingTopic, tpInfo.size(), TEST_REPLICATION_FACTOR)), false); + + assertEquals(Collections.singleton(existingTopic), adminClient.listTopics().names().get()); + } + + @Test + public void testCreatesNotExistingTopics() throws Throwable { + // should be no topics before the call + assertEquals(0, adminClient.listTopics().names().get().size()); + + WorkerUtils.createTopics( + log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), false); + + assertEquals(Collections.singleton(TEST_TOPIC), adminClient.listTopics().names().get()); + assertEquals( + new TopicDescription( + TEST_TOPIC, false, + Collections.singletonList( + new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))), + adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get() + ); + } + + @Test + public void testCreatesOneTopicVerifiesOneTopic() throws Throwable { + final String existingTopic = "existing-topic"; + List tpInfo = new ArrayList<>(); + tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); + tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.emptyList())); + adminClient.addTopic( + false, + existingTopic, + tpInfo, + null); + + Map topics = new HashMap<>(); + topics.put(existingTopic, + new NewTopic(existingTopic, tpInfo.size(), TEST_REPLICATION_FACTOR)); + topics.put(TEST_TOPIC, NEW_TEST_TOPIC); + + WorkerUtils.createTopics(log, adminClient, topics, false); + + assertEquals(Utils.mkSet(existingTopic, TEST_TOPIC), adminClient.listTopics().names().get()); + } + + @Test + public void testCreateNonExistingTopicsWithZeroTopicsDoesNothing() throws Throwable { + WorkerUtils.createTopics( + log, adminClient, Collections.emptyMap(), false); + assertEquals(0, adminClient.listTopics().names().get().size()); + } + +} -- To stop receiving notification emails like this one, please contact rsivaram@apache.org.