kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: MINOR: Remove unused parameter in `checkIfPartitionReassignmentSucceeded` and clean-ups
Date Wed, 20 Jul 2016 02:04:28 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 72c6b7d84 -> 9cad9dbda


MINOR: Remove unused parameter in `checkIfPartitionReassignmentSucceeded` and clean-ups

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira

Closes #1635 from ijuma/remove-unused-parameter-in-check-if-partition-reassignment-succeeded
and squashes the following commits:

f9ed930 [Ismael Juma] Code style improvements in `ReassignPartitionsCommand`
66c7541 [Ismael Juma] Fix comment in `KafkaController.onPartitionReassignment`
85288f3 [Ismael Juma] Remove unused parameter from `checkIfPartitionReassignmentSucceeded`


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9cad9dbd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9cad9dbd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9cad9dbd

Branch: refs/heads/trunk
Commit: 9cad9dbdac273e8fe893b0cbe4f0f22d4271fb7b
Parents: 72c6b7d
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Jul 19 19:04:24 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Jul 19 19:04:24 2016 -0700

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala | 45 +++++++++-----------
 .../kafka/controller/KafkaController.scala      |  4 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |  6 +--
 .../unit/kafka/admin/DeleteTopicTest.scala      |  2 +-
 4 files changed, 25 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9cad9dbd/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index fae0a40..18f741e 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -54,11 +54,7 @@ object ReassignPartitionsCommand extends Logging {
       case e: Throwable =>
         println("Partitions reassignment failed due to " + e.getMessage)
         println(Utils.stackTrace(e))
-    } finally {
-      val zkClient = zkUtils.zkClient
-      if (zkClient != null)
-        zkClient.close()
-    }
+    } finally zkUtils.close()
   }
 
   def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
@@ -70,14 +66,14 @@ object ReassignPartitionsCommand extends Logging {
 
     println("Status of partition reassignment:")
     val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned)
-    reassignedPartitionsStatus.foreach { partition =>
-      partition._2 match {
+    reassignedPartitionsStatus.foreach { case (topicPartition, status) =>
+      status match {
         case ReassignmentCompleted =>
-          println("Reassignment of partition %s completed successfully".format(partition._1))
+          println("Reassignment of partition %s completed successfully".format(topicPartition))
         case ReassignmentFailed =>
-          println("Reassignment of partition %s failed".format(partition._1))
+          println("Reassignment of partition %s failed".format(topicPartition))
         case ReassignmentInProgress =>
-          println("Reassignment of partition %s is still in progress".format(partition._1))
+          println("Reassignment of partition %s is still in progress".format(topicPartition))
       }
     }
   }
@@ -128,20 +124,20 @@ object ReassignPartitionsCommand extends Logging {
     executeAssignment(zkUtils, reassignmentJsonString)
   }
 
-  def executeAssignment(zkUtils: ZkUtils,reassignmentJsonString: String){
+  def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String) {
 
     val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file is empty")
-    val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map{
case(tp,replicas) => tp})
+    val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map
{ case (tp, _) => tp })
     if (duplicateReassignedPartitions.nonEmpty)
       throw new AdminCommandFailedException("Partition reassignment contains duplicate topic
partitions: %s".format(duplicateReassignedPartitions.mkString(",")))
-    val duplicateEntries= partitionsToBeReassigned
-      .map{ case(tp,replicas) => (tp, CoreUtils.duplicates(replicas))}
-      .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty }
+    val duplicateEntries = partitionsToBeReassigned
+      .map { case (tp, replicas) => (tp, CoreUtils.duplicates(replicas))}
+      .filter { case (tp, duplicatedReplicas) => duplicatedReplicas.nonEmpty }
     if (duplicateEntries.nonEmpty) {
       val duplicatesMsg = duplicateEntries
-        .map{ case (tp,duplicateReplicas) => "%s contains multiple entries for %s".format(tp,
duplicateReplicas.mkString(",")) }
+        .map { case (tp, duplicateReplicas) => "%s contains multiple entries for %s".format(tp,
duplicateReplicas.mkString(",")) }
         .mkString(". ")
       throw new AdminCommandFailedException("Partition replica lists may not contain duplicate
entries: %s".format(duplicatesMsg))
     }
@@ -151,7 +147,7 @@ object ReassignPartitionsCommand extends Logging {
     println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file
option during rollback"
       .format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
     // start the reassignment
-    if(reassignPartitionsCommand.reassignPartitions())
+    if (reassignPartitionsCommand.reassignPartitions())
       println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
     else
       println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
@@ -160,14 +156,13 @@ object ReassignPartitionsCommand extends Logging {
   private def checkIfReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]])
   :Map[TopicAndPartition, ReassignmentStatus] = {
     val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
-    partitionsToBeReassigned.map { topicAndPartition =>
-      (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkUtils,topicAndPartition._1,
-        topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
-    }
+    partitionsToBeReassigned.keys.map { topicAndPartition =>
+      (topicAndPartition, checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
partitionsToBeReassigned,
+        partitionsBeingReassigned))
+    }.toMap
   }
 
   def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition,
-                                            reassignedReplicas: Seq[Int],
                                             partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]],
                                             partitionsBeingReassigned: Map[TopicAndPartition,
Seq[Int]]): ReassignmentStatus = {
     val newReplicas = partitionsToBeReassigned(topicAndPartition)
@@ -229,9 +224,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[Top
   def reassignPartitions(): Boolean = {
     try {
       val validPartitions = partitions.filter(p => validatePartition(zkUtils, p._1.topic,
p._1.partition))
-      if(validPartitions.isEmpty) {
-        false
-      }
+      if (validPartitions.isEmpty) false
       else {
         val jsonReassignmentData = zkUtils.formatAsReassignmentJson(validPartitions)
         zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
@@ -253,7 +246,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[Top
       case Some(partitions) =>
         if(partitions.contains(partition)) {
           true
-        }else{
+        } else {
           error("Skipping reassignment of partition [%s,%d] ".format(topic, partition) +
             "since it doesn't exist")
           false

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cad9dbd/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 8b89c8d..0d6f048 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -541,9 +541,9 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val
brokerStat
    *    In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader
from adding any replica in
    *    RAR - OAR back in the isr.
    * 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica
state change, we shrink the
-   *    isr to remove OAR - RAR in zookeeper and sent a LeaderAndIsr ONLY to the Leader to
notify it of the shrunk isr.
+   *    isr to remove OAR - RAR in zookeeper and send a LeaderAndIsr ONLY to the Leader to
notify it of the shrunk isr.
    *    After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR.
-   * 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica
(delete = false) to
+   * 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica
(delete = true) to
    *    the replicas in OAR - RAR to physically delete the replicas on disk.
    * 10. Update AR in ZK with RAR.
    * 11. Update the /admin/reassign_partitions path in ZK to remove this partition.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cad9dbd/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 238cbad..7cc25c8 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -175,7 +175,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
         val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
newReplicas,
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
         Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
@@ -206,7 +206,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
         val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
newReplicas,
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
           Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
@@ -237,7 +237,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
         val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
newReplicas,
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
           Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
       },
       "Partition reassignment should complete")

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cad9dbd/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 1e1a98c..ac23941 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -122,7 +122,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
       val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
-      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
newReplicas,
+      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
         Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed
     }, "Partition reassignment shouldn't complete.")
     val controllerId = zkUtils.getController()


Mime
View raw message