This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new dd7c036 KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config dd7c036 is described below commit dd7c0369560c815482e6efc0a7ad08e0fcdf640f Author: Brian Byrne AuthorDate: Sat Jun 6 21:04:04 2020 +0530 KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config Author: Brian Byrne Reviewers: Chia-Ping Tsai , Boyang Chan , Manikumar Reddy Closes #8717 from bdbyrne/KAFKA-10033 --- .../src/main/scala/kafka/server/AdminManager.scala | 3 ++ .../kafka/server/DynamicConfigChangeTest.scala | 40 ++++++++++++++++++---- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 183a5d3..742156a 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -454,6 +454,9 @@ class AdminManager(val config: KafkaConfig, private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean, configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { val topic = resource.name + if (!metadataCache.contains(topic)) + throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.") + adminZkClient.validateTopicConfig(topic, configProps) validateConfigPolicy(resource, configEntriesMap) if (!validateOnly) { diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 30a044d..c745c92 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -18,18 +18,22 @@ package kafka.server import java.nio.charset.StandardCharsets import java.util.Properties +import java.util.concurrent.ExecutionException -import kafka.log.LogConfig._ -import kafka.server.Constants._ -import org.junit.Assert._ -import org.apache.kafka.common.metrics.Quota -import org.easymock.EasyMock -import org.junit.Test import kafka.integration.KafkaServerTestHarness +import kafka.log.LogConfig._ import kafka.utils._ +import kafka.server.Constants._ import kafka.zk.ConfigEntityChangeNotificationZNode +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.UnknownTopicOrPartitionException +import org.apache.kafka.common.metrics.Quota +import org.easymock.EasyMock +import org.junit.Assert._ +import org.junit.Test import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ @@ -203,6 +207,23 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } @Test + def testConfigChangeOnNonExistingTopicWithAdminClient(): Unit = { + val topic = TestUtils.tempTopic + val admin = createAdminClient() + try { + val resource = new ConfigResource(ConfigResource.Type.TOPIC, topic) + val op = new AlterConfigOp(new ConfigEntry(FlushMessagesProp, "10000"), AlterConfigOp.OpType.SET) + admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all.get + fail("Should fail with UnknownTopicOrPartitionException for topic doesn't exist") + } catch { + case e: ExecutionException => + assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) + } finally { + admin.close() + } + } + + @Test def testProcessNotification(): Unit = { val props = new Properties() props.put("a.b", "10") @@ -314,4 +335,11 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { def parse(configHandler: TopicConfigHandler, value: String): Seq[Int] = { configHandler.parseThrottledPartitions(CoreUtils.propsWith(LeaderReplicationThrottledReplicasProp, value), 102, LeaderReplicationThrottledReplicasProp) } + + private def createAdminClient(): Admin = { + val props = new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList) + Admin.create(props) + } + }