This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 1e17030 KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists
when --bootstrap-server is used (#8737)
1e17030 is described below
commit 1e1703006e5f787091810aef16d9bce9ceb68b21
Author: vinoth chandar <vinothchandar@users.noreply.github.com>
AuthorDate: Tue Jun 2 16:17:05 2020 -0700
KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server
is used (#8737)
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
(cherry picked from commit ea6d373a93205bad41b42ad9a2b2bec717c11c93)
---
core/src/main/scala/kafka/admin/TopicCommand.scala | 112 ++++++++++++---------
.../admin/TopicCommandWithAdminClientTest.scala | 38 +++++--
2 files changed, 95 insertions(+), 55 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 86dc84e..8c1629b 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -228,7 +228,7 @@ object TopicCommand extends Logging {
if (topic.partitions.exists(partitions => partitions < 1))
throw new IllegalArgumentException(s"The partitions must be greater than 0")
- if (!adminClient.listTopics().names().get().contains(topic.name)) {
+ try {
val newTopic = if (topic.hasReplicaAssignment)
new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
else {
@@ -247,8 +247,12 @@ object TopicCommand extends Logging {
val createResult = adminClient.createTopics(Collections.singleton(newTopic))
createResult.all().get()
println(s"Created topic ${topic.name}.")
- } else {
- throw new IllegalArgumentException(s"Topic ${topic.name} already exists")
+ } catch {
+ case e : ExecutionException =>
+ if (e.getCause == null)
+ throw e
+ if (!(e.getCause.isInstanceOf[TopicExistsException] && topic.ifTopicDoesntExist()))
+ throw e.getCause
}
}
@@ -259,24 +263,29 @@ object TopicCommand extends Logging {
override def alterTopic(opts: TopicCommandOptions): Unit = {
val topic = new CommandTopicPartition(opts)
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
- ensureTopicExists(topics, opts.topic)
- val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values()
- adminClient.createPartitions(topics.map {topicName =>
- if (topic.hasReplicaAssignment) {
- val startPartitionId = topicsInfo.get(topicName).get().partitions().size()
- val newAssignment = {
- val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
- new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
+ ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+ if (topics.nonEmpty) {
+ val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values()
+ adminClient.createPartitions(topics.map { topicName =>
+ if (topic.hasReplicaAssignment) {
+ val startPartitionId = topicsInfo.get(topicName).get().partitions().size()
+ val newAssignment = {
+ val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
+ new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
+ }
+ topicName -> NewPartitions.increaseTo(topic.partitions.get, newAssignment)
+ } else {
+ topicName -> NewPartitions.increaseTo(topic.partitions.get)
}
- topicName -> NewPartitions.increaseTo(topic.partitions.get, newAssignment)
- } else {
- topicName -> NewPartitions.increaseTo(topic.partitions.get)
- }}.toMap.asJava).all().get()
+ }.toMap.asJava).all().get()
+ }
}
- private def listAllReassignments(): Map[TopicPartition, PartitionReassignment] = {
+ private def listAllReassignments(topicPartitions: util.Set[TopicPartition]): Map[TopicPartition,
PartitionReassignment] = {
try {
- adminClient.listPartitionReassignments(new ListPartitionReassignmentsOptions).reassignments().get().asScala
+ adminClient.listPartitionReassignments(topicPartitions, new ListPartitionReassignmentsOptions)
+ .reassignments().get().asScala
} catch {
case e: ExecutionException =>
e.getCause match {
@@ -290,33 +299,40 @@ object TopicCommand extends Logging {
override def describeTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
- val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC,
_)).asJavaCollection).values()
- val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
- val reassignments = listAllReassignments()
- val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
- val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-
- for (td <- topicDescriptions) {
- val topicName = td.name
- val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
- val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
- if (describeOptions.describeConfigs) {
- val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
- if (!opts.reportOverriddenConfigs || hasNonDefault) {
- val numPartitions = td.partitions().size
- val firstPartition = td.partitions.iterator.next()
- val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
- val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition,
reassignment), config, markedForDeletion = false)
- topicDesc.printDescription()
+ ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+ if (topics.nonEmpty) {
+ val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC,
_)).asJavaCollection).values()
+ val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
+ val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
+ val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
+ val topicPartitions = topicDescriptions
+ .flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(),
p.partition())))
+ .toSet.asJava
+ val reassignments = listAllReassignments(topicPartitions)
+
+ for (td <- topicDescriptions) {
+ val topicName = td.name
+ val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
+ val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
+
+ if (describeOptions.describeConfigs) {
+ val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
+ if (!opts.reportOverriddenConfigs || hasNonDefault) {
+ val numPartitions = td.partitions().size
+ val firstPartition = td.partitions.iterator.next()
+ val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
+ val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition,
reassignment), config, markedForDeletion = false)
+ topicDesc.printDescription()
+ }
}
- }
- if (describeOptions.describePartitions) {
- for (partition <- sortedPartitions) {
- val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
- val partitionDesc = PartitionDescription(topicName, partition, Some(config),
markedForDeletion = false, reassignment)
- describeOptions.maybePrintPartitionDescription(partitionDesc)
+ if (describeOptions.describePartitions) {
+ for (partition <- sortedPartitions) {
+ val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
+ val partitionDesc = PartitionDescription(topicName, partition, Some(config),
markedForDeletion = false, reassignment)
+ describeOptions.maybePrintPartitionDescription(partitionDesc)
+ }
}
}
}
@@ -324,7 +340,7 @@ object TopicCommand extends Logging {
override def deleteTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
- ensureTopicExists(topics, opts.topic)
+ ensureTopicExists(topics, opts.topic, !opts.ifExists)
adminClient.deleteTopics(topics.asJavaCollection).all().get()
}
@@ -501,7 +517,7 @@ object TopicCommand extends Logging {
* If set to true, the command will throw an exception if the
topic with the
* requested name does not exist.
*/
- private def ensureTopicExists(foundTopics: Seq[String], requestedTopic: Option[String],
requireTopicExists: Boolean = true): Unit = {
+ private def ensureTopicExists(foundTopics: Seq[String], requestedTopic: Option[String],
requireTopicExists: Boolean): Unit = {
// If no topic name was mentioned, do not need to throw exception.
if (requestedTopic.isDefined && requireTopicExists && foundTopics.isEmpty)
{
// If given topic doesn't exist then throw exception
@@ -639,9 +655,9 @@ object TopicCommand extends Logging {
private val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
"if set when describing topics, only show topics that have overridden configs")
private val ifExistsOpt = parser.accepts("if-exists",
- "if set when altering or deleting or describing topics, the action will only execute
if the topic exists. Not supported with the --bootstrap-server option.")
+ "if set when altering or deleting or describing topics, the action will only execute
if the topic exists.")
private val ifNotExistsOpt = parser.accepts("if-not-exists",
- "if set when creating topics, the action will only execute if the topic does not already
exist. Not supported with the --bootstrap-server option.")
+ "if set when creating topics, the action will only execute if the topic does not already
exist.")
private val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware
replica assignment")
@@ -736,8 +752,8 @@ object TopicCommand extends Logging {
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt
+ topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
- CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts --
Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts
-- Set(createOpt) ++ Set(bootstrapServerOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts --
Set(alterOpt, deleteOpt, describeOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts
-- Set(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts
-- Set(listOpt, describeOpt))
}
}
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index 04712d6..fdf110c 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
@@ -214,7 +215,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with
Loggin
}
@Test
- def testCreateIfItAlreadyExists(): Unit = {
+ def testCreateWhenAlreadyExists(): Unit = {
val numPartitions = 1
// create the topic
@@ -223,12 +224,19 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness
with Loggin
createAndWaitTopic(createOpts)
// try to re-create the topic
- intercept[IllegalArgumentException] {
+ intercept[TopicExistsException] {
topicService.createTopic(createOpts)
}
}
@Test
+ def testCreateWhenAlreadyExistsWithIfNotExists(): Unit = {
+ val createOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--if-not-exists"))
+ createAndWaitTopic(createOpts)
+ topicService.createTopic(createOpts)
+ }
+
+ @Test
def testCreateWithReplicaAssignment(): Unit = {
// create the topic
val createOpts = new TopicCommandOptions(
@@ -431,10 +439,9 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness
with Loggin
}
@Test
- def testIfExistsAndIfNotExistsOptionsInvalidWithBootstrapServers(): Unit = {
- // alter a topic that does not exist without --if-exists
- assertCheckArgsExitCode(1, new TopicCommandOptions(Array("--bootstrap-server", "server1:9092",
"--alter", "--if-exists", "--topic", testTopicName, "--partitions", "1")))
- assertCheckArgsExitCode(1, new TopicCommandOptions(Array("--bootstrap-server", "server1:9092",
"--create", "--if-not-exists", "--topic", testTopicName, "--partitions", "1", "--replication-factor",
"1")))
+ def testAlterWhenTopicDoesntExistWithIfExists(): Unit = {
+ topicService.alterTopic(new TopicCommandOptions(
+ Array("--topic", testTopicName, "--partitions", "1", "--if-exists")))
}
@Test
@@ -530,7 +537,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with
Loggin
}
@Test
- def testDeleteIfExists(): Unit = {
+ def testDeleteWhenTopicDoesntExist(): Unit = {
// delete a topic that does not exist
val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName))
intercept[IllegalArgumentException] {
@@ -539,6 +546,11 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness
with Loggin
}
@Test
+ def testDeleteWhenTopicDoesntExistWithIfExists(): Unit = {
+ topicService.deleteTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")))
+ }
+
+ @Test
def testDescribe(): Unit = {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 2, 2.toShort))).all().get()
@@ -552,6 +564,18 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness
with Loggin
}
@Test
+ def testDescribeWhenTopicDoesntExist(): Unit = {
+ intercept[IllegalArgumentException] {
+ topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName)))
+ }
+ }
+
+ @Test
+ def testDescribeWhenTopicDoesntExistWithIfExists(): Unit = {
+ topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")))
+ }
+
+ @Test
def testDescribeUnavailablePartitions(): Unit = {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 6, 1.toShort))).all().get()
|