kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4687; Fix InvalidTopicException due to topic creation race condition
Date Tue, 24 Jan 2017 20:26:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3df60e7eb -> 8dfb8afb9


KAFKA-4687; Fix InvalidTopicException due to topic creation race condition

We now throw the correct TopicExistsException instead.

Author: Andrew Olson <aolson1@cerner.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2425 from noslowerdna/KAFKA-4687


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8dfb8afb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8dfb8afb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8dfb8afb

Branch: refs/heads/trunk
Commit: 8dfb8afb93f3f1026f868466edb66a4cc63b4455
Parents: 3df60e7
Author: Andrew Olson <aolson1@cerner.com>
Authored: Tue Jan 24 20:20:30 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Jan 24 20:24:18 2017 +0000

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminUtils.scala  | 18 ++++++++++--------
 .../test/scala/unit/kafka/admin/AdminTest.scala   | 16 ++++++++++++++++
 .../kafka/server/CreateTopicsRequestTest.scala    |  2 +-
 .../CreateTopicsRequestWithPolicyTest.scala       |  2 +-
 4 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfb8afb/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 02d5fe0..65ac91c 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -386,7 +386,7 @@ object AdminUtils extends Logging with AdminUtilities {
   }
 
   def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
-    zkUtils.zkClient.exists(getTopicPath(topic))
+    zkUtils.pathExists(getTopicPath(topic))
 
   def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
                         brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
@@ -425,16 +425,18 @@ object AdminUtils extends Logging with AdminUtilities {
     // validate arguments
     Topic.validate(topic)
 
-    val topicPath = getTopicPath(topic)
-
     if (!update) {
-      if (zkUtils.zkClient.exists(topicPath))
-        throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
+      if (topicExists(zkUtils, topic))
+        throw new TopicExistsException(s"Topic '$topic' already exists.")
       else if (Topic.hasCollisionChars(topic)) {
         val allTopics = zkUtils.getAllTopics()
-        val collidingTopics = allTopics.filter(t => Topic.hasCollision(topic, t))
+        // check again in case the topic was created in the meantime, otherwise the
+        // topic could potentially collide with itself
+        if (allTopics.contains(topic))
+          throw new TopicExistsException(s"Topic '$topic' already exists.")
+        val collidingTopics = allTopics.filter(Topic.hasCollision(topic, _))
         if (collidingTopics.nonEmpty) {
-          throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic,
collidingTopics.mkString(", ")))
+          throw new InvalidTopicException(s"Topic '$topic' collides with existing topics:
${collidingTopics.mkString(", ")}")
         }
       }
     }
@@ -484,7 +486,7 @@ object AdminUtils extends Logging with AdminUtilities {
       }
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
     } catch {
-      case _: ZkNodeExistsException => throw new TopicExistsException("topic %s already
exists".format(topic))
+      case _: ZkNodeExistsException => throw new TopicExistsException(s"Topic '$topic'
already exists.")
       case e2: Throwable => throw new AdminOperationException(e2.toString)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfb8afb/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index f9eb61c..aa3dbc4 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -20,6 +20,7 @@ import kafka.server.DynamicConfig.Broker._
 import kafka.server.KafkaConfig._
 import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidReplicationFactorException,
InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.metrics.Quota
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
 import java.util.Properties
@@ -159,6 +160,21 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
     }
   }
 
+  @Test
+  def testConcurrentTopicCreation() {
+    val topic = "test.topic"
+
+    // simulate the ZK interactions that can happen when a topic is concurrently created
by multiple processes
+    val zkMock = EasyMock.createNiceMock(classOf[ZkUtils])
+    EasyMock.expect(zkMock.pathExists(s"/brokers/topics/$topic")).andReturn(false)
+    EasyMock.expect(zkMock.getAllTopics).andReturn(Seq("some.topic", topic, "some.other.topic"))
+    EasyMock.replay(zkMock)
+
+    intercept[TopicExistsException] {
+      AdminUtils.validateCreateOrUpdateTopic(zkMock, topic, Map.empty, new Properties, update
= false)
+    }
+  }
+
   private def getBrokersWithPartitionDir(servers: Iterable[KafkaServer], topic: String, partitionId:
Int): Set[Int] = {
     servers.filter(server => new File(server.config.logDirs.head, topic + "-" + partitionId).exists)
            .map(_.config.brokerId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfb8afb/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index e3b0bbe..6efa189 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -63,7 +63,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
 
     // Basic
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(existingTopic ->
new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
-      Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("""Topic "existing-topic"
already exists."""))))
+      Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("Topic 'existing-topic'
already exists."))))
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-partitions"
-> new CreateTopicsRequest.TopicDetails(-1, 1.toShort)).asJava, timeout).build(),
       Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)), checkErrorMessage =
false)
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-replication"
-> new CreateTopicsRequest.TopicDetails(1, (numBrokers + 1).toShort)).asJava, timeout).build(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfb8afb/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index 80f6e9e..9affea4 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -77,7 +77,7 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
     // Check that basic errors still work
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
       Map(existingTopic -> new CreateTopicsRequest.TopicDetails(5, 1.toShort)).asJava,
timeout).build(),
-      Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("""Topic "existing-topic"
already exists."""))))
+      Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("Topic 'existing-topic'
already exists."))))
 
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
       Map("error-replication" -> new CreateTopicsRequest.TopicDetails(10, (numBrokers
+ 1).toShort)).asJava, timeout, true).build(),


Mime
View raw message