kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config
Date Sat, 06 Jun 2020 15:34:51 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dd7c036  KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent
topic's config
dd7c036 is described below

commit dd7c0369560c815482e6efc0a7ad08e0fcdf640f
Author: Brian Byrne <bbyrne@confluent.io>
AuthorDate: Sat Jun 6 21:04:04 2020 +0530

    KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's
config
    
    Author: Brian Byrne <bbyrne@confluent.io>
    
    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Boyang Chan <boyang@confluent.io>,
Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Closes #8717 from bdbyrne/KAFKA-10033
---
 .../src/main/scala/kafka/server/AdminManager.scala |  3 ++
 .../kafka/server/DynamicConfigChangeTest.scala     | 40 ++++++++++++++++++----
 2 files changed, 37 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 183a5d3..742156a 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -454,6 +454,9 @@ class AdminManager(val config: KafkaConfig,
   private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean,
                                 configProps: Properties, configEntriesMap: Map[String, String]):
(ConfigResource, ApiError) = {
     val topic = resource.name
+    if (!metadataCache.contains(topic))
+      throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.")
+
     adminZkClient.validateTopicConfig(topic, configProps)
     validateConfigPolicy(resource, configEntriesMap)
     if (!validateOnly) {
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 30a044d..c745c92 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -18,18 +18,22 @@ package kafka.server
 
 import java.nio.charset.StandardCharsets
 import java.util.Properties
+import java.util.concurrent.ExecutionException
 
-import kafka.log.LogConfig._
-import kafka.server.Constants._
-import org.junit.Assert._
-import org.apache.kafka.common.metrics.Quota
-import org.easymock.EasyMock
-import org.junit.Test
 import kafka.integration.KafkaServerTestHarness
+import kafka.log.LogConfig._
 import kafka.utils._
+import kafka.server.Constants._
 import kafka.zk.ConfigEntityChangeNotificationZNode
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.metrics.Quota
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.Test
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
@@ -203,6 +207,23 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
   }
 
   @Test
+  def testConfigChangeOnNonExistingTopicWithAdminClient(): Unit = {
+    val topic = TestUtils.tempTopic
+    val admin = createAdminClient()
+    try {
+      val resource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+      val op = new AlterConfigOp(new ConfigEntry(FlushMessagesProp, "10000"), AlterConfigOp.OpType.SET)
+      admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all.get
+      fail("Should fail with UnknownTopicOrPartitionException for topic doesn't exist")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException])
+    } finally {
+      admin.close()
+    }
+  }
+
+  @Test
   def testProcessNotification(): Unit = {
     val props = new Properties()
     props.put("a.b", "10")
@@ -314,4 +335,11 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
   def parse(configHandler: TopicConfigHandler, value: String): Seq[Int] = {
     configHandler.parseThrottledPartitions(CoreUtils.propsWith(LeaderReplicationThrottledReplicasProp,
value), 102, LeaderReplicationThrottledReplicasProp)
   }
+
+  private def createAdminClient(): Admin = {
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    Admin.create(props)
+  }
+
 }


Mime
View raw message