kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7082: Concurrent create topics may throw NodeExistsException (#5259)
Date Thu, 21 Jun 2018 23:48:35 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 6c2715f  KAFKA-7082: Concurrent create topics may throw NodeExistsException (#5259)
6c2715f is described below

commit 6c2715f0e72b3d29e0fc0edf9c4694dfab8c3287
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Thu Jun 21 16:47:44 2018 -0700

    KAFKA-7082: Concurrent create topics may throw NodeExistsException (#5259)
    
    This is an unexpected exception so `UnknownServerException`
    is thrown back to the client.
    
    This is a minimal change to make the behaviour match `ZkUtils`.
    This is better, but one could argue that it's not perfect. A more
    sophisticated approach can be tackled separately.
    
    Added a concurrent test that fails without this change.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  1 -
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 15 ++++++++++---
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    | 26 +++++++++++++++++++++-
 3 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 8a6b3ee..060c0b4 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -93,7 +93,6 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
                                                      update: Boolean = false) {
     validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update)
 
-    // Configs only matter if a topic is being created. Changing configs via AlterTopic is
not supported
     if (!update) {
       // write out the config if there is any, this isn't transactional with the partition
assignments
       zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index bb34294..ec4932a 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -246,6 +246,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure:
Boolean
   /**
    * Sets or creates the entity znode path with the given configs depending
    * on whether it already exists or not.
+   *
+   * If this is method is called concurrently, the last writer wins. In cases where we update
configs and then
+   * partition assignment (i.e. create topic), it's possible for one thread to set this and
the other to set the
+   * partition assignment. As such, the recommendation is to never call create topic for
the same topic with different
+   * configs/partition assignment concurrently.
+   *
    * @param rootEntityType entity type
    * @param sanitizedEntityName entity name
    * @throws KeeperException if there is an error while setting or creating the znode
@@ -257,16 +263,19 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure:
Boolean
       retryRequestUntilConnected(setDataRequest)
     }
 
-    def create(configData: Array[Byte]) = {
+    def createOrSet(configData: Array[Byte]): Unit = {
       val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName)
-      createRecursive(path, ConfigEntityZNode.encode(config))
+      try createRecursive(path, ConfigEntityZNode.encode(config))
+      catch {
+        case _: NodeExistsException => set(configData).maybeThrow
+      }
     }
 
     val configData = ConfigEntityZNode.encode(config)
 
     val setDataResponse = set(configData)
     setDataResponse.resultCode match {
-      case Code.NONODE => create(configData)
+      case Code.NONODE => createOrSet(configData)
       case _ => setDataResponse.maybeThrow
     }
   }
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index fe5fbff..39745e5 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -28,8 +28,10 @@ import kafka.utils.TestUtils._
 import kafka.utils.{Logging, TestUtils}
 import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException,
TopicExistsException}
 import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Test}
@@ -132,7 +134,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with
RackAware
   }
 
   @Test
-  def testConcurrentTopicCreation() {
+  def testMockedConcurrentTopicCreation() {
     val topic = "test.topic"
 
     // simulate the ZK interactions that can happen when a topic is concurrently created
by multiple processes
@@ -147,6 +149,28 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with
RackAware
     }
   }
 
+  @Test
+  def testConcurrentTopicCreation() {
+    val topic = "test-concurrent-topic-creation"
+    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
+    val props = new Properties
+    props.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
+    def createTopic(): Unit = {
+      try adminZkClient.createTopic(topic, 3, 1, props)
+      catch { case _: TopicExistsException => () }
+      val (_, partitionAssignment) = zkClient.getPartitionAssignmentForTopics(Set(topic)).head
+      assertEquals(3, partitionAssignment.size)
+      partitionAssignment.foreach { case (partition, replicas) =>
+        assertEquals(s"Unexpected replication factor for $partition", 1, replicas.size)
+      }
+      val savedProps = zkClient.getEntityConfigs(ConfigType.Topic, topic)
+      assertEquals(props, savedProps)
+    }
+
+    TestUtils.assertConcurrent("Concurrent topic creation failed", Seq(createTopic, createTopic),
+      JTestUtils.DEFAULT_MAX_WAIT_MS.toInt)
+  }
+
   /**
    * This test creates a topic with a few config overrides and checks that the configs are
applied to the new topic
    * then changes the config and checks that the new values take effect.


Mime
View raw message