kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5928; Avoid redundant requests to zookeeper when reassign topic partition
Date Tue, 07 Aug 2018 03:05:41 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b0d840d  KAFKA-5928; Avoid redundant requests to zookeeper when reassign topic partition
b0d840d is described below

commit b0d840d34b4172add831367e8fc2c51e75efb549
Author: uncleGen <hustyugm@gmail.com>
AuthorDate: Mon Aug 6 20:04:55 2018 -0700

    KAFKA-5928; Avoid redundant requests to zookeeper when reassign topic partition
    
    Author: uncleGen <hustyugm@gmail.com>
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Dong Lin <lindong28@gmail.com>
    
    Closes #3894 from uncleGen/KAFKA-5928
---
 .../kafka/admin/ReassignPartitionsCommand.scala    | 74 +++++++++++-----------
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |  5 +-
 .../admin/ReassignPartitionsCommandTest.scala      | 15 ++---
 3 files changed, 44 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 4d9da90..041375a 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -347,13 +347,24 @@ object ReassignPartitionsCommand extends Logging {
     (partitionsToBeReassigned, replicaAssignment)
   }
 
-  private def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, partitionsToBeReassigned:
Map[TopicPartition, Seq[Int]])
+  def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, partitionsToBeReassigned:
Map[TopicPartition, Seq[Int]])
   :Map[TopicPartition, ReassignmentStatus] = {
     val partitionsBeingReassigned = zkClient.getPartitionReassignment
-
-    partitionsToBeReassigned.keys.map { topicAndPartition =>
-      (topicAndPartition, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
partitionsToBeReassigned,
-        partitionsBeingReassigned))
+    val (beingReassigned, notBeingReassigned) = partitionsToBeReassigned.keys.partition {
topicAndPartition =>
+      partitionsBeingReassigned.contains(topicAndPartition)
+    }
+    notBeingReassigned.groupBy(_.topic).flatMap { case (topic, partitions) =>
+      val replicasForTopic = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
+      partitions.map { topicAndPartition =>
+        val newReplicas = partitionsToBeReassigned(topicAndPartition)
+        val reassignmentStatus = replicasForTopic.get(topicAndPartition) match {
+          case Some(seq) if seq == newReplicas => ReassignmentCompleted
+          case _ => ReassignmentFailed
+        }
+        (topicAndPartition, reassignmentStatus)
+      }
+    } ++ beingReassigned.map { topicAndPartition =>
+      (topicAndPartition, ReassignmentInProgress)
     }.toMap
   }
 
@@ -398,25 +409,6 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
-  def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, topicAndPartition: TopicPartition,
-                                            partitionsToBeReassigned: Map[TopicPartition,
Seq[Int]],
-                                            partitionsBeingReassigned: Map[TopicPartition,
Seq[Int]]): ReassignmentStatus = {
-    val newReplicas = partitionsToBeReassigned(topicAndPartition)
-    partitionsBeingReassigned.get(topicAndPartition) match {
-      case Some(_) => ReassignmentInProgress
-      case None =>
-        // check if the current replica assignment matches the expected one after reassignment
-        val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topicAndPartition.topic,
topicAndPartition.partition))
-        if(assignedReplicas == newReplicas)
-          ReassignmentCompleted
-        else {
-          println(("ERROR: Assigned replicas (%s) don't match the list of replicas for reassignment
(%s)" +
-            " for partition %s").format(assignedReplicas.mkString(","), newReplicas.mkString(","),
topicAndPartition))
-          ReassignmentFailed
-        }
-    }
-  }
-
   def validateAndParseArgs(args: Array[String]): ReassignPartitionsCommandOptions = {
     val opts = new ReassignPartitionsCommandOptions(args)
 
@@ -559,7 +551,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
   private[admin] def assignThrottledReplicas(existingPartitionAssignment: Map[TopicPartition,
Seq[Int]],
                                              proposedPartitionAssignment: Map[TopicPartition,
Seq[Int]],
                                              adminZkClient: AdminZkClient): Unit = {
-    for (topic <- proposedPartitionAssignment.keySet.map(_.topic).toSeq) {
+    for (topic <- proposedPartitionAssignment.keySet.map(_.topic).toSeq.distinct) {
       val existingPartitionAssignmentForTopic = existingPartitionAssignment.filter { case
(tp, _) => tp.topic == topic }
       val proposedPartitionAssignmentForTopic = proposedPartitionAssignment.filter { case
(tp, _) => tp.topic == topic }
 
@@ -621,7 +613,10 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
   def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean
= {
     maybeThrottle(throttle)
     try {
-      val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkClient,
p.topic, p.partition) }
+      val validPartitions = proposedPartitionAssignment.groupBy(_._1.topic())
+        .flatMap { case (topic, topicPartitionReplicas) =>
+          validatePartition(zkClient, topic, topicPartitionReplicas)
+        }
       if (validPartitions.isEmpty) false
       else {
         if (proposedReplicaAssignment.nonEmpty && adminClientOpt.isEmpty)
@@ -655,21 +650,24 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
     }
   }
 
-  def validatePartition(zkClient: KafkaZkClient, topic: String, partition: Int): Boolean
= {
+  def validatePartition(zkClient: KafkaZkClient, topic: String, topicPartitionReplicas: Map[TopicPartition,
Seq[Int]])
+  :Map[TopicPartition, Seq[Int]] = {
     // check if partition exists
     val partitionsOpt = zkClient.getPartitionsForTopics(immutable.Set(topic)).get(topic)
-    partitionsOpt match {
-      case Some(partitions) =>
-        if(partitions.contains(partition)) {
-          true
-        } else {
-          error("Skipping reassignment of partition [%s,%d] ".format(topic, partition) +
-            "since it doesn't exist")
+    topicPartitionReplicas.filter { case (topicPartition, _) =>
+      partitionsOpt match {
+        case Some(partitions) =>
+          if (partitions.contains(topicPartition.partition())) {
+            true
+          } else {
+            error("Skipping reassignment of partition [%s,%d] ".format(topic, topicPartition.partition())
+
+              "since it doesn't exist")
+            false
+          }
+        case None => error("Skipping reassignment of partition " +
+          "[%s,%d] since topic %s doesn't exist".format(topic, topicPartition.partition(),
topic))
           false
-        }
-      case None => error("Skipping reassignment of partition " +
-        "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
-        false
+      }
     }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 4d089b3..12fb479 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -132,9 +132,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = zkClient.getPartitionReassignment
-      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicPartition,
-        Map(topicPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed
+      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, Map(topicPartition
-> newReplicas))
+        .getOrElse(topicPartition, fail(s"Failed to get reassignment status for $topicPartition"))
== ReassignmentFailed
     }, "Partition reassignment shouldn't complete.")
     val controllerId = zkClient.getControllerId.getOrElse(fail("Controller doesn't exist"))
     val controller = servers.filter(s => s.config.brokerId == controllerId).head
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 6978f8d..213c23a 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -439,9 +439,8 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging {
     assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
-        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, Map(topicAndPartition
-> newReplicas))
+          .getOrElse(topicAndPartition, fail(s"Failed to get reassignment status for $topicAndPartition"))
== ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
@@ -469,9 +468,8 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging {
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, Map(topicAndPartition
-> newReplicas))
+          .getOrElse(topicAndPartition, fail(s"Failed to get reassignment status for $topicAndPartition"))
== ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
@@ -498,9 +496,8 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging {
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, Map(topicAndPartition
-> newReplicas))
+          .getOrElse(topicAndPartition, fail(s"Failed to get reassignment status for $topicAndPartition"))
== ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))


Mime
View raw message