kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9038; Allow creating partitions while a reassignment is in progress (#7582)
Date Fri, 25 Oct 2019 06:59:32 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 569a6f9  KAFKA-9038; Allow creating partitions while a reassignment is in progress
 (#7582)
569a6f9 is described below

commit 569a6f9cecb8eff8a9ab0811aea2bb070d11c2db
Author: NIkhil Bhatia <rite2nikhil@gmail.com>
AuthorDate: Thu Oct 24 23:40:02 2019 -0700

    KAFKA-9038; Allow creating partitions while a reassignment is in progress  (#7582)
    
    Prior to this patch, partition creation would not be allowed for any topic while a reassignment
is in progress. The PR makes this a topic level check. As long as a particular topic is not
being reassigned, we allow partitions to be increased.
    
    Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson
<jason@confluent.io>
---
 .../src/main/scala/kafka/server/AdminManager.scala | 14 ++---
 .../admin/ReassignPartitionsClusterTest.scala      | 72 ++++++++++++++++++++--
 2 files changed, 74 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 5de1bcd..d9a1732 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -259,20 +259,20 @@ class AdminManager(val config: KafkaConfig,
                        listenerName: ListenerName,
                        callback: Map[String, ApiError] => Unit): Unit = {
 
-    val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
     val allBrokers = adminZkClient.getBrokerMetadatas()
     val allBrokerIds = allBrokers.map(_.id)
 
     // 1. map over topics creating assignment and calling AdminUtils
     val metadata = newPartitions.map { case (topic, newPartition) =>
       try {
-        // We prevent addition partitions while a reassignment is in progress, since
-        // during reassignment there is no meaningful notion of replication factor
-        if (reassignPartitionsInProgress)
-          throw new ReassignmentInProgressException("A partition reassignment is in progress.")
-
         val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic)).map
{
-          case (topicPartition, assignment) => topicPartition.partition -> assignment
+          case (topicPartition, assignment) =>
+            if (assignment.isBeingReassigned) {
+              // We prevent adding partitions while topic reassignment is in progress, to
protect from a race condition
+              // between the controller thread processing reassignment update and createPartitions(this)
request.
+              throw new ReassignmentInProgressException(s"A partition reassignment is in
progress for the topic '$topic'.")
+            }
+            topicPartition.partition -> assignment
         }
         if (existingAssignment.isEmpty)
           throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.")
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 6ee2f53..5331559 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -12,8 +12,6 @@
   */
 package kafka.admin
 
-import java.util.{Collections, Properties}
-
 import kafka.admin.ReassignPartitionsCommand._
 import kafka.common.AdminCommandFailedException
 import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer}
@@ -23,19 +21,22 @@ import kafka.zk.{ReassignPartitionsZNode, ZkVersion, ZooKeeperTestHarness}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.{After, Before, Test}
 import kafka.admin.ReplicationQuotaUtils._
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry,
NewPartitionReassignment, PartitionReassignment, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry,
NewPartitionReassignment, NewPartitions, PartitionReassignment, AdminClient => JAdminClient}
 import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq}
 import scala.util.Random
 import java.io.File
+import java.util.{Collections, Properties}
+import java.util.concurrent.ExecutionException
 
 import kafka.controller.ReplicaAssignment
 import kafka.log.LogConfig
-import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.NoReassignmentInProgressException
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.errors.{NoReassignmentInProgressException, ReassignmentInProgressException}
+import org.scalatest.Assertions.intercept
 
 class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   var servers: Seq[KafkaServer] = null
@@ -1121,6 +1122,67 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
   }
 
   /**
+   * Verifies that partitions can be created for topics not in reassignment and for the topics
that are in reassignment
+   * an ReassignmentInProgressException should be thrown. The test creates two topics `topicName`
and `otherTopicName`,
+   * the `topicName` topic undergoes partition reassignment and the test validates that during
reassignment createPartitions
+   * call throws ReassignmentInProgressException `topicName` topic and for topic `otherTopicName`
which is not being reassigned
+   * successfully creates partitions. Further validates that after the reassignment is complete
for topic `topicName`
+   * createPartition is successful for that topic.
+   */
+  @Test
+  def shouldCreatePartitionsForTopicNotInReassignment(): Unit = {
+    startBrokers(Seq(100, 101))
+    val otherTopicName = "anyTopic"
+    val otp0 = new TopicPartition(otherTopicName, 0)
+    val otp1 = new TopicPartition(otherTopicName, 1)
+    adminClient = createAdminClient(servers)
+    createTopic(zkClient, topicName,
+      Map(otp0.partition() -> Seq(100),
+          otp1.partition() -> Seq(100)),
+      servers = servers)
+    createTopic(zkClient, otherTopicName,
+      Map(tp0.partition() -> Seq(100),
+          tp1.partition() -> Seq(100)),
+      servers = servers)
+
+    // Throttle to avoid race conditions
+    throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
+      tp0 -> Seq(100, 101),
+      tp1 -> Seq(100, 101)
+    ))
+
+    // Alter `topicName` partition reassignment
+    adminClient.alterPartitionReassignments(
+      Map(reassignmentEntry(tp0, Seq(101)),
+        reassignmentEntry(tp1, Seq(101))).asJava
+    ).all().get()
+    waitUntilTrue(() => {
+      !adminClient.listPartitionReassignments().reassignments().get().isEmpty
+    }, "Controller should have picked up reassignment", 1000)
+
+    def testCreatePartitions(topicName: String, isTopicBeingReassigned: Boolean): Unit =
{
+      if (isTopicBeingReassigned)
+        assertTrue("createPartitions for topic under reassignment should throw an exception",
intercept[ExecutionException](
+          adminClient.createPartitions(Map(topicName -> NewPartitions.increaseTo(4)).asJava).values.get(topicName).get()).
+          getCause.isInstanceOf[ReassignmentInProgressException])
+      else
+        adminClient.createPartitions(Map(topicName -> NewPartitions.increaseTo(4)).asJava).values.get(topicName).get()
+    }
+
+    // Test case: createPartitions throws ReassignmentInProgressException Topics with partitions
in reassignment.
+    testCreatePartitions(topicName, true)
+    // Test case: createPartitions is successful for Topics with partitions NOT in reassignment.
+    testCreatePartitions(otherTopicName, false)
+
+    // complete reassignment
+    resetBrokersThrottle()
+    waitForAllReassignmentsToComplete()
+
+    // Test case: createPartitions is successful for Topics with partitions after reassignment
has completed.
+    testCreatePartitions(topicName, false)
+  }
+
+  /**
     * Asserts that a replica is being reassigned from the given replicas to the target replicas
     */
   def assertIsReassigning(from: Seq[Int], to: Seq[Int], reassignment: PartitionReassignment):
Unit = {


Mime
View raw message