kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2393: Correctly Handle InvalidTopicException in KafkaApis.getTo…
Date Wed, 05 Aug 2015 22:56:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 85d8218ef -> 27d499fe6


KAFKA-2393: Correctly Handle InvalidTopicException in KafkaApis.getTo…

…picMetadata()

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #117 from granthenke/invalid-topic and squashes the following commits:

0abda5f [Grant Henke] KAFKA-2393: Correctly Handle InvalidTopicException in KafkaApis.getTopicMetadata()


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27d499fe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27d499fe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27d499fe

Branch: refs/heads/trunk
Commit: 27d499fe66fc034669a1852ca33ee2f486e1b83c
Parents: 85d8218
Author: Grant Henke <granthenke@gmail.com>
Authored: Wed Aug 5 15:55:48 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Aug 5 15:55:48 2015 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala |  5 +++-
 .../kafka/integration/TopicMetadataTest.scala   | 28 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/27d499fe/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 18f5b5b..7ea509c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -462,10 +462,13 @@ class KafkaApis(val requestChannel: RequestChannel,
               info("Auto creation of topic %s with %d partitions and replication factor %d
is successful!"
                    .format(topic, config.numPartitions, config.defaultReplicationFactor))
             }
+            new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
           } catch {
             case e: TopicExistsException => // let it go, possibly another broker created
this topic
+              new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+            case itex: InvalidTopicException =>
+              new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.InvalidTopicCode)
           }
-          new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
         } else {
           new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d499fe/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 5b6c9d6..9aebec4 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -135,6 +135,34 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness
{
     assertTrue(partitionMetadata.head.leader.isDefined)
   }
 
+  def testAutoCreateTopicWithCollision {
+    // auto create topic
+    val topic1 = "testAutoCreate_Topic"
+    val topic2 = "testAutoCreate.Topic"
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1, topic2), brokerEndPoints,
"TopicMetadataTest-testAutoCreateTopic",
+      2000,0).topicsMetadata
+    assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size)
+    assertEquals("Expecting metadata for topic1", topic1, topicsMetadata.head.topic)
+    assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
+    assertEquals("Expecting metadata for topic2", topic2, topicsMetadata(1).topic)
+    assertEquals("Expecting InvalidTopicCode for topic2 metadata", ErrorMapping.InvalidTopicCode,
topicsMetadata(1).errorCode)
+
+    // wait for leader to be elected
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0)
+
+    // retry the metadata for the first auto created topic
+    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    var partitionMetadata = topicsMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertEquals(1, partitionMetadata.head.replicas.size)
+    assertTrue(partitionMetadata.head.leader.isDefined)
+  }
+
   private def checkIsr(servers: Seq[KafkaServer]): Unit = {
     val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState
!= NotRunning.state)
     val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map(


Mime
View raw message