kafka-1073; CheckReassignmentStatus is broken; patched by Jun Rao; reviewed by Guozhang Wang,
Swapnil Ghike and Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71ed6ca3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71ed6ca3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71ed6ca3
Branch: refs/heads/trunk
Commit: 71ed6ca3368ff38909f502565a4bf0f39e70fc6c
Parents: 2c6d3c7
Author: Jun Rao <junrao@gmail.com>
Authored: Mon Oct 7 09:22:12 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Oct 7 09:22:12 2013 -0700
----------------------------------------------------------------------
bin/kafka-check-reassignment-status.sh | 17 ---
.../kafka/admin/CheckReassignmentStatus.scala | 110 -------------------
.../kafka/admin/ReassignPartitionsCommand.scala | 74 +++++++++++--
3 files changed, 63 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/71ed6ca3/bin/kafka-check-reassignment-status.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-check-reassignment-status.sh b/bin/kafka-check-reassignment-status.sh
deleted file mode 100755
index 1f21858..0000000
--- a/bin/kafka-check-reassignment-status.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-$(dirname $0)/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@
http://git-wip-us.apache.org/repos/asf/kafka/blob/71ed6ca3/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
deleted file mode 100644
index 7e85f87..0000000
--- a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import joptsimple.OptionParser
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils._
-import scala.collection.Map
-import kafka.common.TopicAndPartition
-
-object CheckReassignmentStatus extends Logging {
-
- def main(args: Array[String]): Unit = {
- val parser = new OptionParser
- val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the
list of partitions and the " +
- "new replicas they should be reassigned to")
- .withRequiredArg
- .describedAs("partition reassignment json file path")
- .ofType(classOf[String])
- val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the " +
- "form host:port. Multiple URLS can be given to allow fail-over.")
- .withRequiredArg
- .describedAs("urls")
- .ofType(classOf[String])
-
- val options = parser.parse(args : _*)
-
- for(arg <- List(jsonFileOpt, zkConnectOpt)) {
- if(!options.has(arg)) {
- System.err.println("Missing required argument \"" + arg + "\"")
- parser.printHelpOn(System.err)
- System.exit(1)
- }
- }
-
- val jsonFile = options.valueOf(jsonFileOpt)
- val zkConnect = options.valueOf(zkConnectOpt)
- val jsonString = Utils.readFileAsString(jsonFile)
- val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-
- try {
- // read the json file into a string
- val partitionsToBeReassigned = Json.parseFull(jsonString) match {
- case Some(reassignedPartitions) =>
- val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
- partitions.map { m =>
- val topic = m.asInstanceOf[Map[String, String]].get("topic").get
- val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
- val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
- val newReplicas = replicasList.split(",").map(_.toInt)
- (TopicAndPartition(topic, partition), newReplicas.toSeq)
- }.toMap
- case None => Map.empty[TopicAndPartition, Seq[Int]]
- }
-
- val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
- reassignedPartitionsStatus.foreach { partition =>
- partition._2 match {
- case ReassignmentCompleted =>
- println("Partition %s reassignment completed successfully".format(partition._1))
- case ReassignmentFailed =>
- println("Partition %s reassignment failed".format(partition._1))
- case ReassignmentInProgress =>
- println("Partition %s reassignment in progress".format(partition._1))
- }
- }
- }
- }
-
- def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]])
- :Map[TopicAndPartition, ReassignmentStatus] = {
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
- // for all partitions whose replica reassignment is complete, check the status
- partitionsToBeReassigned.map { topicAndPartition =>
- (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1,
- topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
- }
- }
-
- def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition,
- reassignedReplicas: Seq[Int],
- partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]],
- partitionsBeingReassigned: Map[TopicAndPartition,
Seq[Int]]): ReassignmentStatus = {
- val newReplicas = partitionsToBeReassigned(topicAndPartition)
- partitionsBeingReassigned.get(topicAndPartition) match {
- case Some(partition) => ReassignmentInProgress
- case None =>
- // check if AR == RAR
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic,
topicAndPartition.partition)
- if(assignedReplicas == newReplicas)
- ReassignmentCompleted
- else
- ReassignmentFailed
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/71ed6ca3/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index f333d29..c6fc4ab 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -58,6 +58,12 @@ object ReassignPartitionsCommand extends Logging {
.describedAs("execute")
.ofType(classOf[String])
+ val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The
JSON file with the list of partitions and the " +
+ "new replicas they should be reassigned to, which can be obtained from the output of
a dry run.")
+ .withRequiredArg
+ .describedAs("partition reassignment json file path")
+ .ofType(classOf[String])
+
val options = parser.parse(args : _*)
for(arg <- List(zkConnectOpt)) {
@@ -80,7 +86,24 @@ object ReassignPartitionsCommand extends Logging {
var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition,
List[Int]]()
- if(options.has(topicsToMoveJsonFileOpt)) {
+ if(options.has(statusCheckJsonFileOpt)) {
+ val jsonFile = options.valueOf(statusCheckJsonFileOpt)
+ val jsonString = Utils.readFileAsString(jsonFile)
+ val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
+
+ println("Status of partition reassignment:")
+ val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
+ reassignedPartitionsStatus.foreach { partition =>
+ partition._2 match {
+ case ReassignmentCompleted =>
+ println("Reassignment of partition %s completed successfully".format(partition._1))
+ case ReassignmentFailed =>
+ println("Reassignment of partition %s failed".format(partition._1))
+ case ReassignmentInProgress =>
+ println("Reassignment of partition %s is still in progress".format(partition._1))
+ }
+ }
+ } else if(options.has(topicsToMoveJsonFileOpt)) {
val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt)
val brokerList = options.valueOf(brokerListOpt)
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
@@ -107,16 +130,19 @@ object ReassignPartitionsCommand extends Logging {
System.exit(1)
}
- if (options.has(executeOpt)) {
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
-
- if(reassignPartitionsCommand.reassignPartitions())
- println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
- else
- println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
- } else {
- System.out.println("This is a dry run (Use --execute to do the actual reassignment.
" +
- "The replica assignment is \n" + partitionsToBeReassigned.toString())
+ if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt))
{
+ if (options.has(executeOpt)) {
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
+
+ if(reassignPartitionsCommand.reassignPartitions())
+ println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
+ else
+ println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+ } else {
+ System.out.println("This is a dry run (Use --execute to do the actual reassignment.
" +
+ "The following is the replica assignment. Save it for the status check option.\n"
+
+ ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))
+ }
}
} catch {
case e: Throwable =>
@@ -127,6 +153,32 @@ object ReassignPartitionsCommand extends Logging {
zkClient.close()
}
}
+
+ private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned:
Map[TopicAndPartition, Seq[Int]])
+ :Map[TopicAndPartition, ReassignmentStatus] = {
+ val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
+ partitionsToBeReassigned.map { topicAndPartition =>
+ (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1,
+ topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
+ }
+ }
+
+ private def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition:
TopicAndPartition,
+ reassignedReplicas: Seq[Int],
+ partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]],
+ partitionsBeingReassigned: Map[TopicAndPartition,
Seq[Int]]): ReassignmentStatus = {
+ val newReplicas = partitionsToBeReassigned(topicAndPartition)
+ partitionsBeingReassigned.get(topicAndPartition) match {
+ case Some(partition) => ReassignmentInProgress
+ case None =>
+ // check if the current replica assignment matches the expected one after reassignment
+ val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic,
topicAndPartition.partition)
+ if(assignedReplicas == newReplicas)
+ ReassignmentCompleted
+ else
+ ReassignmentFailed
+ }
+ }
}
class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition,
collection.Seq[Int]])
|