This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dd7c036 KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent
topic's config
dd7c036 is described below
commit dd7c0369560c815482e6efc0a7ad08e0fcdf640f
Author: Brian Byrne <bbyrne@confluent.io>
AuthorDate: Sat Jun 6 21:04:04 2020 +0530
KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's
config
Author: Brian Byrne <bbyrne@confluent.io>
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Boyang Chan <boyang@confluent.io>,
Manikumar Reddy <manikumar.reddy@gmail.com>
Closes #8717 from bdbyrne/KAFKA-10033
---
.../src/main/scala/kafka/server/AdminManager.scala | 3 ++
.../kafka/server/DynamicConfigChangeTest.scala | 40 ++++++++++++++++++----
2 files changed, 37 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 183a5d3..742156a 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -454,6 +454,9 @@ class AdminManager(val config: KafkaConfig,
private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean,
configProps: Properties, configEntriesMap: Map[String, String]):
(ConfigResource, ApiError) = {
val topic = resource.name
+ if (!metadataCache.contains(topic))
+ throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.")
+
adminZkClient.validateTopicConfig(topic, configProps)
validateConfigPolicy(resource, configEntriesMap)
if (!validateOnly) {
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 30a044d..c745c92 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -18,18 +18,22 @@ package kafka.server
import java.nio.charset.StandardCharsets
import java.util.Properties
+import java.util.concurrent.ExecutionException
-import kafka.log.LogConfig._
-import kafka.server.Constants._
-import org.junit.Assert._
-import org.apache.kafka.common.metrics.Quota
-import org.easymock.EasyMock
-import org.junit.Test
import kafka.integration.KafkaServerTestHarness
+import kafka.log.LogConfig._
import kafka.utils._
+import kafka.server.Constants._
import kafka.zk.ConfigEntityChangeNotificationZNode
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.metrics.Quota
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.Test
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@@ -203,6 +207,23 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@Test
+ def testConfigChangeOnNonExistingTopicWithAdminClient(): Unit = {
+ val topic = TestUtils.tempTopic
+ val admin = createAdminClient()
+ try {
+ val resource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+ val op = new AlterConfigOp(new ConfigEntry(FlushMessagesProp, "10000"), AlterConfigOp.OpType.SET)
+ admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all.get
+ fail("Should fail with UnknownTopicOrPartitionException for topic doesn't exist")
+ } catch {
+ case e: ExecutionException =>
+ assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException])
+ } finally {
+ admin.close()
+ }
+ }
+
+ @Test
def testProcessNotification(): Unit = {
val props = new Properties()
props.put("a.b", "10")
@@ -314,4 +335,11 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
def parse(configHandler: TopicConfigHandler, value: String): Seq[Int] = {
configHandler.parseThrottledPartitions(CoreUtils.propsWith(LeaderReplicationThrottledReplicasProp,
value), 102, LeaderReplicationThrottledReplicasProp)
}
+
+ private def createAdminClient(): Admin = {
+ val props = new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ Admin.create(props)
+ }
+
}
|