kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1073; CheckReassignmentStatus is broken; patched by Jun Rao; reviewed by Guozhang Wang, Swapnil Ghike and Neha Narkhede
Date Mon, 07 Oct 2013 16:22:08 GMT
Updated Branches:
  refs/heads/0.8 2c6d3c7b4 -> 71ed6ca33


kafka-1073; CheckReassignmentStatus is broken; patched by Jun Rao; reviewed by Guozhang Wang,
Swapnil Ghike and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71ed6ca3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71ed6ca3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71ed6ca3

Branch: refs/heads/0.8
Commit: 71ed6ca3368ff38909f502565a4bf0f39e70fc6c
Parents: 2c6d3c7
Author: Jun Rao <junrao@gmail.com>
Authored: Mon Oct 7 09:22:12 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Oct 7 09:22:12 2013 -0700

----------------------------------------------------------------------
 bin/kafka-check-reassignment-status.sh          |  17 ---
 .../kafka/admin/CheckReassignmentStatus.scala   | 110 -------------------
 .../kafka/admin/ReassignPartitionsCommand.scala |  74 +++++++++++--
 3 files changed, 63 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/71ed6ca3/bin/kafka-check-reassignment-status.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-check-reassignment-status.sh b/bin/kafka-check-reassignment-status.sh
deleted file mode 100755
index 1f21858..0000000
--- a/bin/kafka-check-reassignment-status.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-$(dirname $0)/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/71ed6ca3/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
deleted file mode 100644
index 7e85f87..0000000
--- a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import joptsimple.OptionParser
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils._
-import scala.collection.Map
-import kafka.common.TopicAndPartition
-
-object CheckReassignmentStatus extends Logging {
-
-  def main(args: Array[String]): Unit = {
-    val parser = new OptionParser
-    val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the
list of partitions and the " +
-      "new replicas they should be reassigned to")
-      .withRequiredArg
-      .describedAs("partition reassignment json file path")
-      .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the " +
-      "form host:port. Multiple URLS can be given to allow fail-over.")
-      .withRequiredArg
-      .describedAs("urls")
-      .ofType(classOf[String])
-
-    val options = parser.parse(args : _*)
-
-    for(arg <- List(jsonFileOpt, zkConnectOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    val jsonFile = options.valueOf(jsonFileOpt)
-    val zkConnect = options.valueOf(zkConnectOpt)
-    val jsonString = Utils.readFileAsString(jsonFile)
-    val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-
-    try {
-      // read the json file into a string
-      val partitionsToBeReassigned = Json.parseFull(jsonString) match {
-        case Some(reassignedPartitions) =>
-          val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
-          partitions.map { m =>
-            val topic = m.asInstanceOf[Map[String, String]].get("topic").get
-            val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
-            val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
-            val newReplicas = replicasList.split(",").map(_.toInt)
-            (TopicAndPartition(topic, partition), newReplicas.toSeq)
-          }.toMap
-        case None => Map.empty[TopicAndPartition, Seq[Int]]
-      }
-
-      val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
-      reassignedPartitionsStatus.foreach { partition =>
-        partition._2 match {
-          case ReassignmentCompleted =>
-            println("Partition %s reassignment completed successfully".format(partition._1))
-          case ReassignmentFailed =>
-            println("Partition %s reassignment failed".format(partition._1))
-          case ReassignmentInProgress =>
-            println("Partition %s reassignment in progress".format(partition._1))
-        }
-      }
-    }
-  }
-
-  def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]])
-  :Map[TopicAndPartition, ReassignmentStatus] = {
-    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
-    // for all partitions whose replica reassignment is complete, check the status
-    partitionsToBeReassigned.map { topicAndPartition =>
-      (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1,
-        topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
-    }
-  }
-
-  def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition,
-                                            reassignedReplicas: Seq[Int],
-                                            partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]],
-                                            partitionsBeingReassigned: Map[TopicAndPartition,
Seq[Int]]): ReassignmentStatus = {
-    val newReplicas = partitionsToBeReassigned(topicAndPartition)
-    partitionsBeingReassigned.get(topicAndPartition) match {
-      case Some(partition) => ReassignmentInProgress
-      case None =>
-        // check if AR == RAR
-        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic,
topicAndPartition.partition)
-        if(assignedReplicas == newReplicas)
-          ReassignmentCompleted
-        else
-          ReassignmentFailed
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/71ed6ca3/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index f333d29..c6fc4ab 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -58,6 +58,12 @@ object ReassignPartitionsCommand extends Logging {
       .describedAs("execute")
       .ofType(classOf[String])
 
+    val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The
JSON file with the list of partitions and the " +
+      "new replicas they should be reassigned to, which can be obtained from the output of
a dry run.")
+      .withRequiredArg
+      .describedAs("partition reassignment json file path")
+      .ofType(classOf[String])
+
     val options = parser.parse(args : _*)
 
     for(arg <- List(zkConnectOpt)) {
@@ -80,7 +86,24 @@ object ReassignPartitionsCommand extends Logging {
 
       var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition,
List[Int]]()
 
-      if(options.has(topicsToMoveJsonFileOpt)) {
+      if(options.has(statusCheckJsonFileOpt)) {
+        val jsonFile = options.valueOf(statusCheckJsonFileOpt)
+        val jsonString = Utils.readFileAsString(jsonFile)
+        val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
+
+        println("Status of partition reassignment:")
+        val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
+        reassignedPartitionsStatus.foreach { partition =>
+          partition._2 match {
+            case ReassignmentCompleted =>
+              println("Reassignment of partition %s completed successfully".format(partition._1))
+            case ReassignmentFailed =>
+              println("Reassignment of partition %s failed".format(partition._1))
+            case ReassignmentInProgress =>
+              println("Reassignment of partition %s is still in progress".format(partition._1))
+          }
+        }
+      } else if(options.has(topicsToMoveJsonFileOpt)) {
         val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt)
         val brokerList = options.valueOf(brokerListOpt)
         val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
@@ -107,16 +130,19 @@ object ReassignPartitionsCommand extends Logging {
         System.exit(1)
       }
 
-      if (options.has(executeOpt)) {
-        val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
-
-        if(reassignPartitionsCommand.reassignPartitions())
-          println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
-        else
-          println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
-      } else {
-        System.out.println("This is a dry run (Use --execute to do the actual reassignment.
" +
-          "The replica assignment is \n" + partitionsToBeReassigned.toString())
+      if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt))
{
+        if (options.has(executeOpt)) {
+          val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
+
+          if(reassignPartitionsCommand.reassignPartitions())
+            println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
+          else
+            println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+        } else {
+          System.out.println("This is a dry run (Use --execute to do the actual reassignment.
" +
+            "The following is the replica assignment. Save it for the status check option.\n"
+
+            ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))
+        }
       }
     } catch {
       case e: Throwable =>
@@ -127,6 +153,32 @@ object ReassignPartitionsCommand extends Logging {
         zkClient.close()
     }
   }
+
+  private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned:
Map[TopicAndPartition, Seq[Int]])
+  :Map[TopicAndPartition, ReassignmentStatus] = {
+    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
+    partitionsToBeReassigned.map { topicAndPartition =>
+      (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1,
+        topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
+    }
+  }
+
+  private def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition:
TopicAndPartition,
+                                            reassignedReplicas: Seq[Int],
+                                            partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]],
+                                            partitionsBeingReassigned: Map[TopicAndPartition,
Seq[Int]]): ReassignmentStatus = {
+    val newReplicas = partitionsToBeReassigned(topicAndPartition)
+    partitionsBeingReassigned.get(topicAndPartition) match {
+      case Some(partition) => ReassignmentInProgress
+      case None =>
+        // check if the current replica assignment matches the expected one after reassignment
+        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic,
topicAndPartition.partition)
+        if(assignedReplicas == newReplicas)
+          ReassignmentCompleted
+        else
+          ReassignmentFailed
+    }
+  }
 }
 
 class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition,
collection.Seq[Int]])


Mime
View raw message