kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4200; Fix throttle argument in kafka-reassign-partitions.sh
Date Tue, 27 Sep 2016 13:34:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 98a625028 -> 55631c976


KAFKA-4200; Fix throttle argument in kafka-reassign-partitions.sh

Simple jira which alters two things:

1. kafka-reassign-partitions --verify prints Throttle was removed regardless of whether a
throttle was applied. It should only print this if the value was actually changed.

2. --verify should exception if the —throttle argument. (check generate too)

To test this I extracted all validation logic into a separate method and added a test which
covers the majority of combinations. The validation logic was retained as is, other than implementing
(2) and adding validation to the --broker-list option which you can currently apply to any
of hte main actions (where it is ignored). Requirement 1 was tested manually (as it's just
println).

Testing:
- Build passes locally.
- System test reassign_partitions_test.py also passes.

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1896 from benstopford/KAFKA-4200

(cherry picked from commit 5d6408f6cfda3f8ab366195f69e90de048cde25d)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55631c97
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55631c97
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55631c97

Branch: refs/heads/0.10.1
Commit: 55631c976639c5c9295055553a6fe3bd06a66b21
Parents: 98a6250
Author: Ben Stopford <benstopford@gmail.com>
Authored: Tue Sep 27 14:00:44 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Sep 27 14:34:00 2016 +0100

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala |  71 +++---
 .../scala/kafka/utils/CommandLineUtils.scala    |  18 +-
 .../ReassignPartitionsCommandArgsTest.scala     | 232 +++++++++++++++++++
 3 files changed, 292 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/55631c97/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index ad050b4..5059463 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -16,7 +16,6 @@
  */
 package kafka.admin
 
-import java.text.NumberFormat._
 import java.util.Properties
 import joptsimple.OptionParser
 import kafka.log.LogConfig
@@ -30,19 +29,9 @@ import org.apache.kafka.common.security.JaasUtils
 
 object ReassignPartitionsCommand extends Logging {
 
-  //TODO Note to reviewer - this class needs a little more work (which I'll complete on Monday,
or we could just revert this, but including here as an outline of what is intended)
-
   def main(args: Array[String]): Unit = {
 
-    val opts = new ReassignPartitionsCommandOptions(args)
-
-    // should have exactly one action
-    val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has
_)
-    if(actions != 1)
-      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action:
--generate, --execute or --verify")
-
-    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
-
+    val opts = validateAndParseArgs(args)
     val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
     val zkUtils = ZkUtils(zkConnect,
                           30000,
@@ -63,15 +52,13 @@ object ReassignPartitionsCommand extends Logging {
   }
 
   def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
-    if(!opts.options.has(opts.reassignmentJsonFileOpt))
-      CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command
must include --reassignment-json-file that was used during the --execute option")
     val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val jsonString = Utils.readFileAsString(jsonFile)
     verifyAssignment(zkUtils, jsonString)
   }
 
   def verifyAssignment(zkUtils: ZkUtils, jsonString: String): Unit = {
-    println("Status of partition reassignment:")
+    println("Status of partition reassignment: ")
     val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
     val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned)
     reassignedPartitionsStatus.foreach { case (topicPartition, status) =>
@@ -87,30 +74,35 @@ object ReassignPartitionsCommand extends Logging {
     removeThrottle(zkUtils, partitionsToBeReassigned, reassignedPartitionsStatus)
   }
 
-  def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, scala.Seq[Int]],
reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus]): Unit = {
+  private def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition,
scala.Seq[Int]], reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus]):
Unit = {
+    var changed = false
+
     //If all partitions have completed remove the throttle
     if (reassignedPartitionsStatus.forall { case (topicPartition, status) => status ==
ReassignmentCompleted }) {
       //Remove the throttle limit from all brokers in the cluster
       for (brokerId <- zkUtils.getAllBrokersInCluster().map(_.id)) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, brokerId.toString)
-        if (configs.remove(KafkaConfig.ThrottledReplicationRateLimitProp) != null)
+        if (configs.remove(KafkaConfig.ThrottledReplicationRateLimitProp) != null) {
           AdminUtils.changeBrokerConfig(zkUtils, Seq(brokerId), configs)
+          changed = true
+        }
       }
 
       //Remove the list of throttled replicas from all topics with partitions being moved
       val topics = partitionsToBeReassigned.keySet.map(tp => tp.topic).toSeq.distinct
       for (topic <- topics) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
-        if (configs.remove(LogConfig.ThrottledReplicasListProp) != null)
+        if (configs.remove(LogConfig.ThrottledReplicasListProp) != null) {
           AdminUtils.changeTopicConfig(zkUtils, topic, configs)
+          changed = true
+        }
       }
-      println("Throttle was removed.")
+      if (changed)
+        println("Throttle was removed.")
     }
   }
 
   def generateAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
-    if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt)))
-      CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command
must include both --topics-to-move-json-file and --broker-list options")
     val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
     val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
     val duplicateReassignments = CoreUtils.duplicates(brokerListToReassign)
@@ -146,8 +138,6 @@ object ReassignPartitionsCommand extends Logging {
   }
 
   def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
-    if(!opts.options.has(opts.reassignmentJsonFileOpt))
-      CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command
must include --reassignment-json-file that was output " + "during the --generate option")
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
     val throttle = if (opts.options.has(opts.throttleOpt)) opts.options.valueOf(opts.throttleOpt)
else -1
@@ -228,6 +218,38 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
+  def validateAndParseArgs(args: Array[String]): ReassignPartitionsCommandOptions = {
+    val opts = new ReassignPartitionsCommandOptions(args)
+
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(opts.parser, "This command moves topic partitions
between replicas.")
+
+    // Should have exactly one action
+    val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has
_)
+    if(actions != 1)
+      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action:
--generate, --execute or --verify")
+
+    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
+
+    //Validate arguments for each action
+    if(opts.options.has(opts.verifyOpt)) {
+      if(!opts.options.has(opts.reassignmentJsonFileOpt))
+        CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command
must include --reassignment-json-file that was used during the --execute option")
+      CommandLineUtils.checkInvalidArgs(opts.parser, opts.options, opts.verifyOpt, Set(opts.throttleOpt,
opts.topicsToMoveJsonFileOpt, opts.disableRackAware, opts.brokerListOpt))
+    }
+    else if(opts.options.has(opts.generateOpt)) {
+      if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt)))
+        CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command
must include both --topics-to-move-json-file and --broker-list options")
+      CommandLineUtils.checkInvalidArgs(opts.parser, opts.options, opts.generateOpt, Set(opts.throttleOpt,
opts.reassignmentJsonFileOpt))
+    }
+    else if (opts.options.has(opts.executeOpt)){
+      if(!opts.options.has(opts.reassignmentJsonFileOpt))
+        CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command
must include --reassignment-json-file that was output " + "during the --generate option")
+      CommandLineUtils.checkInvalidArgs(opts.parser, opts.options, opts.executeOpt, Set(opts.topicsToMoveJsonFileOpt,
opts.disableRackAware, opts.brokerListOpt))
+    }
+    opts
+  }
+
   class ReassignPartitionsCommandOptions(args: Array[String]) {
     val parser = new OptionParser
 
@@ -263,9 +285,6 @@ object ReassignPartitionsCommand extends Logging {
                       .describedAs("throttle")
                       .defaultsTo("-1")
                       .ofType(classOf[Long])
-    if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between
replicas.")
-
     val options = parser.parse(args : _*)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/55631c97/core/src/main/scala/kafka/utils/CommandLineUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index ba5a789..edc3621 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -25,6 +25,16 @@ import java.util.Properties
  */
 object CommandLineUtils extends Logging {
 
+   trait ExitPolicy {
+     def exit(msg: String): Nothing
+   }
+
+   val DEFAULT_EXIT_POLICY = new ExitPolicy {
+     override def exit(msg: String): Nothing = sys.exit(1)
+   }
+
+   private var exitPolicy = DEFAULT_EXIT_POLICY
+
   /**
    * Check that all the listed options are present
    */
@@ -34,7 +44,7 @@ object CommandLineUtils extends Logging {
         printUsageAndDie(parser, "Missing required argument \"" + arg + "\"")
     }
   }
-  
+
   /**
    * Check that none of the listed options are present
    */
@@ -46,16 +56,18 @@ object CommandLineUtils extends Logging {
       }
     }
   }
-  
+
   /**
    * Print usage and exit
    */
   def printUsageAndDie(parser: OptionParser, message: String): Nothing = {
     System.err.println(message)
     parser.printHelpOn(System.err)
-    sys.exit(1)
+    exitPolicy.exit(message)
   }
 
+  def exitPolicy(policy: ExitPolicy): Unit = this.exitPolicy = policy
+
   /**
    * Parse key-value pairs in the form key=value
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/55631c97/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
new file mode 100644
index 0000000..1685130
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
@@ -0,0 +1,232 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.admin
+
+import kafka.utils.CommandLineUtils
+import kafka.utils.CommandLineUtils.ExitPolicy
+import org.junit.Assert.assertTrue
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+class ReassignPartitionsCommandArgsTest extends JUnitSuite {
+
+  @Before
+  def setUp() {
+    CommandLineUtils.exitPolicy(new ExitPolicy {
+      override def exit(msg: String): Nothing = throw new IllegalArgumentException(msg)
+    })
+  }
+
+  @After
+  def tearDown() {
+    CommandLineUtils.exitPolicy(CommandLineUtils.DEFAULT_EXIT_POLICY)
+  }
+
+  /**
+    * HAPPY PATH
+    */
+
+  @Test
+  def shouldCorrectlyParseValidMinimumGenerateOptions(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--generate",
+      "--broker-list", "101,102",
+      "--topics-to-move-json-file", "myfile.json")
+    ReassignPartitionsCommand.validateAndParseArgs(args)
+  }
+
+  @Test
+  def shouldCorrectlyParseValidMinimumExecuteOptions(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--execute",
+      "--reassignment-json-file", "myfile.json")
+    ReassignPartitionsCommand.validateAndParseArgs(args)
+  }
+
+  @Test
+  def shouldCorrectlyParseValidMinimumVerifyOptions(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--verify",
+      "--reassignment-json-file", "myfile.json")
+    ReassignPartitionsCommand.validateAndParseArgs(args)
+  }
+
+  @Test
+  def shouldAllowThrottleOptionOnExecute(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--execute",
+      "--throttle", "100",
+      "--reassignment-json-file", "myfile.json")
+    ReassignPartitionsCommand.validateAndParseArgs(args)
+  }
+
+  /**
+    * NO ARGS
+    */
+
+  @Test
+  def shouldFailIfNoArgs(): Unit = {
+    val args: Array[String]= Array()
+    shouldFailWith("This command moves topic partitions between replicas.", args)
+  }
+
+  @Test
+  def shouldFailIfBlankArg(): Unit = {
+    val args = Array(" ")
+    shouldFailWith("Command must include exactly one action: --generate, --execute or --verify",
args)
+  }
+
+  /**
+    * UNHAPPY PATH: EXECUTE ACTION
+    */
+
+  @Test
+  def shouldNotAllowExecuteWithTopicsOption(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--execute",
+      "--reassignment-json-file", "myfile.json",
+      "--topics-to-move-json-file", "myfile.json")
+    shouldFailWith("Option \"[execute]\" can't be used with option\"[topics-to-move-json-file]\"",
args)
+  }
+
+  @Test
+  def shouldNotAllowExecuteWithBrokers(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--execute",
+      "--reassignment-json-file", "myfile.json",
+      "--broker-list", "101,102"
+    )
+    shouldFailWith("Option \"[execute]\" can't be used with option\"[broker-list]\"", args)
+  }
+
+  @Test
+  def shouldNotAllowExecuteWithoutReassignmentOption(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--execute")
+    shouldFailWith("If --execute option is used, command must include --reassignment-json-file
that was output during the --generate option", args)
+  }
+
+  /**
+    * UNHAPPY PATH: GENERATE ACTION
+    */
+
+  @Test
+  def shouldNotAllowGenerateWithoutBrokersAndTopicsOptions(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--generate")
+    shouldFailWith("If --generate option is used, command must include both --topics-to-move-json-file
and --broker-list options", args)
+  }
+
+  @Test
+  def shouldNotAllowGenerateWithoutBrokersOption(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--topics-to-move-json-file", "myfile.json",
+      "--generate")
+    shouldFailWith("If --generate option is used, command must include both --topics-to-move-json-file
and --broker-list options", args)
+  }
+
+  @Test
+  def shouldNotAllowGenerateWithoutTopicsOption(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--broker-list", "101,102",
+      "--generate")
+    shouldFailWith("If --generate option is used, command must include both --topics-to-move-json-file
and --broker-list options", args)
+  }
+
+  @Test
+  def shouldNotAllowGenerateWithThrottleOption(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--generate",
+      "--broker-list", "101,102",
+      "--throttle", "100",
+      "--topics-to-move-json-file", "myfile.json")
+    shouldFailWith("Option \"[generate]\" can't be used with option\"[throttle]\"", args)
+  }
+
+  @Test
+  def shouldNotAllowGenerateWithReassignmentOption(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--generate",
+      "--broker-list", "101,102",
+      "--topics-to-move-json-file", "myfile.json",
+      "--reassignment-json-file", "myfile.json")
+    shouldFailWith("Option \"[generate]\" can't be used with option\"[reassignment-json-file]\"",
args)
+  }
+
+  /**
+    * UNHAPPY PATH: VERIFY ACTION
+    */
+
+  @Test
+  def shouldNotAllowVerifyWithoutReassignmentOption(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--verify")
+    shouldFailWith("If --verify option is used, command must include --reassignment-json-file
that was used during the --execute option", args)
+  }
+
+  @Test
+  def shouldNotAllowBrokersListWithVerifyOption(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--verify",
+      "--broker-list", "100,101",
+      "--reassignment-json-file", "myfile.json")
+    shouldFailWith("Option \"[verify]\" can't be used with option\"[broker-list]\"", args)
+  }
+
+  @Test
+  def shouldNotAllowThrottleWithVerifyOption(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--verify",
+      "--throttle", "100",
+      "--reassignment-json-file", "myfile.json")
+    shouldFailWith("Option \"[verify]\" can't be used with option\"[throttle]\"", args)
+  }
+
+  @Test
+  def shouldNotAllowTopicsOptionWithVerify(): Unit = {
+    val args = Array(
+      "--zookeeper", "localhost:1234",
+      "--verify",
+      "--reassignment-json-file", "myfile.json",
+      "--topics-to-move-json-file", "myfile.json")
+    shouldFailWith("Option \"[verify]\" can't be used with option\"[topics-to-move-json-file]\"",
args)
+  }
+
+  def shouldFailWith(msg: String, args: Array[String]): Unit = {
+    try {
+      ReassignPartitionsCommand.validateAndParseArgs(args)
+      fail(s"Should have failed with [$msg] but no failure occurred.")
+    } catch {
+      case e: Exception => assertTrue(s"Expected exception with message:\n[$msg]\nbut
was\n[${e.getMessage}]", e.getMessage.startsWith(msg))
+    }
+  }
+}


Mime
View raw message