kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6778; AdminClient.describeConfigs() should return error for non-existent topics (#4866)
Date Thu, 26 Apr 2018 21:01:12 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff1875f  KAFKA-6778; AdminClient.describeConfigs() should return error for non-existent
topics (#4866)
ff1875f is described below

commit ff1875fce0a82737069e195060b6a93881954a23
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Fri Apr 27 02:31:04 2018 +0530

    KAFKA-6778; AdminClient.describeConfigs() should return error for non-existent topics
(#4866)
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
---
 .../src/main/scala/kafka/server/AdminManager.scala | 12 ++++++++----
 .../kafka/api/AdminClientIntegrationTest.scala     | 22 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index b54defc..01457a1 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -304,10 +304,14 @@ class AdminManager(val config: KafkaConfig,
           case ResourceType.TOPIC =>
             val topic = resource.name
             Topic.validate(topic)
-            // Consider optimizing this by caching the configs or retrieving them from the
`Log` when possible
-            val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
-            val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config),
topicProps)
-            createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig,
topicProps, includeSynonyms))
+            if (metadataCache.contains(topic)) {
+              // Consider optimizing this by caching the configs or retrieving them from
the `Log` when possible
+              val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
+              val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config),
topicProps)
+              createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig,
topicProps, includeSynonyms))
+            } else {
+              new DescribeConfigsResponse.Config(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
null), Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+            }
 
           case ResourceType.BROKER =>
             if (resource.name == null || resource.name.isEmpty)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 33c14c6..b31c09d 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -49,6 +49,7 @@ import scala.collection.JavaConverters._
 import java.lang.{Long => JLong}
 
 import kafka.zk.KafkaZkClient
+import org.scalatest.Assertions.intercept
 
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
@@ -821,6 +822,27 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     client.close()
   }
 
+  @Test
+  def testDescribeConfigsForTopic(): Unit = {
+    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+    client = AdminClient.create(createConfig)
+
+    val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+    client.describeConfigs(Collections.singletonList(existingTopic)).values.get(existingTopic).get()
+
+    val nonExistentTopic = new ConfigResource(ConfigResource.Type.TOPIC, "unknown")
+    val describeResult1 = client.describeConfigs(Collections.singletonList(nonExistentTopic))
+
+    assertTrue(intercept[ExecutionException](describeResult1.values.get(nonExistentTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException])
+
+    val invalidTopic = new ConfigResource(ConfigResource.Type.TOPIC, "(invalid topic)")
+    val describeResult2 = client.describeConfigs(Collections.singletonList(invalidTopic))
+
+    assertTrue(intercept[ExecutionException](describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException])
+
+    client.close()
+  }
+
   private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte],
Array[Byte]]): Unit = {
     consumer.subscribe(Collections.singletonList(topic))
     TestUtils.waitUntilTrue(() => {

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message