kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7418: Add the missing '--help' option to Kafka commands (KIP-374)
Date Thu, 22 Nov 2018 11:43:03 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ad26914  KAFKA-7418: Add the missing '--help' option to Kafka commands (KIP-374)
ad26914 is described below

commit ad26914de65e2db244fbe36a2a5fd03de87dfb79
Author: Srinivas Reddy <srinivas96alluri@gmail.com>
AuthorDate: Thu Nov 22 17:12:34 2018 +0530

    KAFKA-7418: Add the missing '--help' option to Kafka commands (KIP-374)
    
    Changes made as part of this [KIP-374](https://cwiki.apache.org/confluence/x/FgSQBQ) and [KAFKA-7418](https://issues.apache.org/jira/browse/KAFKA-7418)
     - Checking for empty args or help option in command file to print Usage
     - Added new class to enforce help option to all commands
     - Refactored few lines (ex `PreferredReplicaLeaderElectionCommand`) to
       make use of `CommandDefaultOptions` attributes.
     - Made the changes in help text wordings
    
    Run the unit tests in local(Windows) few Linux friendly tests are failing but
    not any functionality, verified `--help` and no option response by running
    Scala classes, since those all are having `main` method.
    
    Author: Srinivas Reddy <srinivas96alluri@gmail.com>
    Author: Srinivas Reddy <mrsrinivas@users.noreply.github.com>
    Author: Srinivas <srinivas96alluri@gmail.com>
    
    Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Manikumar Reddy <manikumar.reddy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
    
    Closes #5910 from mrsrinivas/KIP-374
---
 .../kafka/connect/cli/ConnectDistributed.java      |   5 +-
 .../kafka/connect/cli/ConnectStandalone.java       |   4 +-
 core/src/main/scala/kafka/admin/AclCommand.scala   |  13 +-
 .../kafka/admin/BrokerApiVersionsCommand.scala     |  11 +-
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  14 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |  12 +-
 .../scala/kafka/admin/DelegationTokenCommand.scala |  12 +-
 .../scala/kafka/admin/DeleteRecordsCommand.scala   |  18 +-
 .../main/scala/kafka/admin/LogDirsCommand.scala    |  13 +-
 .../PreferredReplicaLeaderElectionCommand.scala    |  60 ++--
 .../kafka/admin/ReassignPartitionsCommand.scala    |  14 +-
 core/src/main/scala/kafka/admin/TopicCommand.scala |  14 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala     |  44 +--
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |  11 +-
 .../main/scala/kafka/tools/ConsoleProducer.scala   |  20 +-
 .../scala/kafka/tools/ConsumerPerformance.scala    |  18 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   | 207 +++++++------
 core/src/main/scala/kafka/tools/MirrorMaker.scala  | 324 ++++++++++-----------
 core/src/main/scala/kafka/tools/PerfConfig.scala   |   6 +-
 .../main/scala/kafka/tools/StreamsResetter.java    |  40 +--
 .../scala/kafka/utils/CommandDefaultOptions.scala  |  26 ++
 .../main/scala/kafka/utils/CommandLineUtils.scala  |  41 ++-
 .../admin/ReassignPartitionsCommandArgsTest.scala  |   2 +-
 23 files changed, 478 insertions(+), 451 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index f8c15de..dd43c37 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
@@ -53,8 +54,8 @@ import java.util.Map;
 public class ConnectDistributed {
     private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);
 
-    public static void main(String[] args) throws Exception {
-        if (args.length < 1) {
+    public static void main(String[] args) {
+        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
             log.info("Usage: ConnectDistributed worker.properties");
             Exit.exit(1);
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index aba9d9c..3498ffe 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -55,9 +55,9 @@ import java.util.Map;
 public class ConnectStandalone {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
 
-        if (args.length < 2) {
+        if (args.length < 2 || Arrays.asList(args).contains("--help")) {
             log.info("Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...]");
             Exit.exit(1);
         }
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index e4cde13..397238a 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -26,10 +26,10 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.clients.admin.{AdminClientConfig, AdminClient => JAdminClient}
 import org.apache.kafka.common.acl._
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{SecurityUtils, Utils}
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -53,8 +53,7 @@ object AclCommand extends Logging {
 
     val opts = new AclCommandOptions(args)
 
-    if (opts.options.has(opts.helpOpt))
-      CommandLineUtils.printUsageAndDie(opts.parser, "Usage:")
+    CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to manage acls on kafka.")
 
     opts.checkArgs()
 
@@ -461,8 +460,7 @@ object AclCommand extends Logging {
     }
   }
 
-  class AclCommandOptions(args: Array[String]) {
-    val parser = new OptionParser(false)
+  class AclCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
     val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
 
     val bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
@@ -579,11 +577,9 @@ object AclCommand extends Logging {
     val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
       "This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.")
 
-    val helpOpt = parser.accepts("help", "Print usage information.")
-
     val forceOpt = parser.accepts("force", "Assume Yes to all queries and do not prompt.")
 
-    val options = parser.parse(args: _*)
+    options = parser.parse(args: _*)
 
     def checkArgs() {
       if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
@@ -621,7 +617,6 @@ object AclCommand extends Logging {
         CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.")
     }
   }
-
 }
 
 class PatternTypeConverter extends EnumConverter[PatternType](classOf[PatternType]) {
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index b25a8da..88acf7f 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -20,10 +20,9 @@ package kafka.admin
 import java.io.PrintStream
 import java.util.Properties
 
-import kafka.utils.CommandLineUtils
-import org.apache.kafka.common.utils.Utils
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.kafka.clients.CommonClientConfigs
-import joptsimple._
+import org.apache.kafka.common.utils.Utils
 
 import scala.util.{Failure, Success}
 
@@ -59,11 +58,10 @@ object BrokerApiVersionsCommand {
     AdminClient.create(props)
   }
 
-  class BrokerVersionCommandOptions(args: Array[String]) {
+  class BrokerVersionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
     val BootstrapServerDoc = "REQUIRED: The server to connect to."
     val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
 
-    val parser = new OptionParser(false)
     val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
                                  .withRequiredArg
                                  .describedAs("command config property file")
@@ -72,10 +70,11 @@ object BrokerApiVersionsCommand {
                                    .withRequiredArg
                                    .describedAs("server(s) to use for bootstrapping")
                                    .ofType(classOf[String])
-    val options = parser.parse(args : _*)
+    options = parser.parse(args : _*)
     checkArgs()
 
     def checkArgs() {
+      CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to retrieve broker version information.")
       // check required args
       CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
     }
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index d8dade0..f1aa9ff 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -24,7 +24,7 @@ import joptsimple._
 import kafka.common.Config
 import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig}
-import kafka.utils.{CommandLineUtils, Exit, PasswordEncoder}
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncoder}
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
@@ -36,8 +36,8 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
 import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
 
-import scala.collection._
 import scala.collection.JavaConverters._
+import scala.collection._
 
 
 /**
@@ -72,8 +72,7 @@ object ConfigCommand extends Config {
     try {
       val opts = new ConfigCommandOptions(args)
 
-      if (args.length == 0)
-        CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config for a topic, client, user or broker")
+      CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to manipulate and describe entity config for a topic, client, user or broker")
 
       opts.checkArgs()
 
@@ -464,8 +463,8 @@ object ConfigCommand extends Config {
     ConfigEntity(entities.head, if (entities.size > 1) Some(entities(1)) else None)
   }
 
-  class ConfigCommandOptions(args: Array[String]) {
-    val parser = new OptionParser(false)
+  class ConfigCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
+
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
             "Multiple URLS can be given to allow fail-over.")
             .withRequiredArg
@@ -504,9 +503,8 @@ object ConfigCommand extends Config {
             .withRequiredArg
             .ofType(classOf[String])
             .withValuesSeparatedBy(',')
-    val helpOpt = parser.accepts("help", "Print usage information.")
     val forceOpt = parser.accepts("force", "Suppress console prompts")
-    val options = parser.parse(args : _*)
+    options = parser.parse(args : _*)
 
     val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addConfig, deleteConfig, helpOpt)
 
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index c0f6797..f27f2c3 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -22,11 +22,11 @@ import java.util
 import java.util.{Date, Properties}
 
 import javax.xml.datatype.DatatypeFactory
-import joptsimple.{OptionParser, OptionSpec}
+import joptsimple.OptionSpec
 import kafka.utils._
-import org.apache.kafka.clients.{CommonClientConfigs, admin}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.clients.{CommonClientConfigs, admin}
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
@@ -41,8 +41,7 @@ object ConsumerGroupCommand extends Logging {
   def main(args: Array[String]) {
     val opts = new ConsumerGroupCommandOptions(args)
 
-    if (args.length == 0)
-      CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
+    CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
 
     // should have exactly one action
     val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt).count(opts.options.has)
@@ -669,7 +668,7 @@ object ConsumerGroupCommand extends Logging {
     case object Ignore extends LogOffsetResult
   }
 
-  class ConsumerGroupCommandOptions(args: Array[String]) {
+  class ConsumerGroupCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
     val BootstrapServerDoc = "REQUIRED: The server(s) to connect to."
     val GroupDoc = "The consumer group we wish to act on."
     val TopicDoc = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " +
@@ -712,7 +711,6 @@ object ConsumerGroupCommand extends Logging {
     val StateDoc = "Describe the group state. This option may be used with '--describe' and '--bootstrap-server' options only." + nl +
       "Example: --bootstrap-server localhost:9092 --describe --group group1 --state"
 
-    val parser = new OptionParser(false)
     val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
                                    .withRequiredArg
                                    .describedAs("server to connect to")
@@ -776,7 +774,7 @@ object ConsumerGroupCommand extends Logging {
 
     parser.mutuallyExclusive(membersOpt, offsetsOpt, stateOpt)
 
-    val options = parser.parse(args : _*)
+    options = parser.parse(args : _*)
 
     val describeOptPresent = options.has(describeOpt)
 
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
index 616d4dc..4837180 100644
--- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -21,8 +21,8 @@ import java.text.SimpleDateFormat
 import java.util
 import java.util.Base64
 
-import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser}
-import kafka.utils.{CommandLineUtils, Exit, Logging}
+import joptsimple.ArgumentAcceptingOptionSpec
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions, AdminClient => JAdminClient}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
@@ -40,8 +40,7 @@ object DelegationTokenCommand extends Logging {
   def main(args: Array[String]): Unit = {
     val opts = new DelegationTokenCommandOptions(args)
 
-    if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(opts.parser, "Tool to create, renew, expire, or describe delegation tokens.")
+    CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to create, renew, expire, or describe delegation tokens.")
 
     // should have exactly one action
     val actions = Seq(opts.createOpt, opts.renewOpt, opts.expiryOpt, opts.describeOpt).count(opts.options.has _)
@@ -150,12 +149,11 @@ object DelegationTokenCommand extends Logging {
     JAdminClient.create(props)
   }
 
-  class DelegationTokenCommandOptions(args: Array[String]) {
+  class DelegationTokenCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
     val BootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping."
     val CommandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
       " operations are allowed in secure mode only. This config file is used to pass security related configs."
 
-    val parser = new OptionParser(false)
     val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
                                    .withRequiredArg
                                    .ofType(classOf[String])
@@ -196,7 +194,7 @@ object DelegationTokenCommand extends Logging {
       .withOptionalArg
       .ofType(classOf[String])
 
-    val options = parser.parse(args : _*)
+    options = parser.parse(args : _*)
 
     def checkArgs() {
       // check required args
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 4a85d09..10977c2 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -21,14 +21,12 @@ import java.io.PrintStream
 import java.util.Properties
 
 import kafka.common.AdminCommandFailedException
-import kafka.utils.{CommandLineUtils, CoreUtils, Json}
+import kafka.utils.json.JsonValue
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Json}
+import org.apache.kafka.clients.admin.RecordsToDelete
+import org.apache.kafka.clients.{CommonClientConfigs, admin}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.clients.admin
-import org.apache.kafka.clients.admin.RecordsToDelete
-import org.apache.kafka.clients.CommonClientConfigs
-import joptsimple._
-import kafka.utils.json.JsonValue
 
 import scala.collection.JavaConverters._
 
@@ -110,13 +108,12 @@ object DeleteRecordsCommand {
     admin.AdminClient.create(props)
   }
 
-  class DeleteRecordsCommandOptions(args: Array[String]) {
+  class DeleteRecordsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
     val BootstrapServerDoc = "REQUIRED: The server to connect to."
     val offsetJsonFileDoc = "REQUIRED: The JSON file with offset per partition. The format to use is:\n" +
                                  "{\"partitions\":\n  [{\"topic\": \"foo\", \"partition\": 1, \"offset\": 1}],\n \"version\":1\n}"
     val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
 
-    val parser = new OptionParser(false)
     val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
                                    .withRequiredArg
                                    .describedAs("server(s) to use for bootstrapping")
@@ -130,7 +127,10 @@ object DeleteRecordsCommand {
                                    .describedAs("command config property file path")
                                    .ofType(classOf[String])
 
-    val options = parser.parse(args : _*)
+    options = parser.parse(args : _*)
+
+    CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to delete records of the given partitions down to the specified offset.")
+
     CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, offsetJsonFileOpt)
   }
 }
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index b51f25d..3beb29b 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -20,14 +20,13 @@ package kafka.admin
 import java.io.PrintStream
 import java.util.Properties
 
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json}
 import org.apache.kafka.clients.admin.{AdminClientConfig, DescribeLogDirsResult, AdminClient => JAdminClient}
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
+import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
-import kafka.utils.{CommandLineUtils, Json}
-import joptsimple._
-import org.apache.kafka.common.utils.Utils
 
 /**
   * A command for querying log directory usage on the specified brokers
@@ -93,8 +92,7 @@ object LogDirsCommand {
         JAdminClient.create(props)
     }
 
-    class LogDirsCommandOptions(args: Array[String]) {
-        val parser = new OptionParser(false)
+    class LogDirsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args){
         val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping")
           .withRequiredArg
           .describedAs("The server(s) to use for bootstrapping")
@@ -116,7 +114,10 @@ object LogDirsCommand {
           .describedAs("Broker list")
           .ofType(classOf[String])
 
-        val options = parser.parse(args : _*)
+        options = parser.parse(args : _*)
+
+        CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to query log directory usage on the specified brokers.")
+
         CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt)
     }
 }
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 89ab580..7bfecde 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -16,53 +16,36 @@
  */
 package kafka.admin
 
-import joptsimple.OptionParser
-import kafka.utils._
 import kafka.common.AdminCommandFailedException
+import kafka.utils._
 import kafka.zk.KafkaZkClient
-
-import collection._
-import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.zookeeper.KeeperException.NodeExistsException
 
+import collection._
+
 object PreferredReplicaLeaderElectionCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
-    val parser = new OptionParser(false)
-    val jsonFileOpt = parser.accepts("path-to-json-file", "The JSON file with the list of partitions " +
-      "for which preferred replica leader election should be done, in the following format - \n" +
-       "{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\n" +
-      "Defaults to all existing partitions")
-      .withRequiredArg
-      .describedAs("list of partitions for which preferred replica leader election needs to be triggered")
-      .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
-      "form host:port. Multiple URLS can be given to allow fail-over.")
-      .withRequiredArg
-      .describedAs("urls")
-      .ofType(classOf[String])
-      
-    if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(parser, "This tool causes leadership for each partition to be transferred back to the 'preferred replica'," + 
-                                                " it can be used to balance leadership among the servers.")
+    val commandOpts = new PreferredReplicaLeaderElectionCommandOptions(args)
+    CommandLineUtils.printHelpAndExitIfNeeded(commandOpts, "This tool helps to causes leadership for each partition to be transferred back to the 'preferred replica'," +
+      " it can be used to balance leadership among the servers.")
 
-    val options = parser.parse(args : _*)
+    CommandLineUtils.checkRequiredArgs(commandOpts.parser, commandOpts.options, commandOpts.zkConnectOpt)
 
-    CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
-
-    val zkConnect = options.valueOf(zkConnectOpt)
+    val zkConnect = commandOpts.options.valueOf(commandOpts.zkConnectOpt)
     var zkClient: KafkaZkClient = null
     try {
       val time = Time.SYSTEM
       zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time)
 
       val partitionsForPreferredReplicaElection =
-        if (!options.has(jsonFileOpt))
+        if (!commandOpts.options.has(commandOpts.jsonFileOpt))
           zkClient.getAllPartitions()
         else
-          parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
+          parsePreferredReplicaElectionData(Utils.readFileAsString(commandOpts.options.valueOf(commandOpts.jsonFileOpt)))
       val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
 
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
@@ -109,13 +92,30 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       case e2: Throwable => throw new AdminOperationException(e2.toString)
     }
   }
+
+  class PreferredReplicaLeaderElectionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
+    val jsonFileOpt = parser.accepts("path-to-json-file", "The JSON file with the list of partitions " +
+      "for which preferred replica leader election should be done, in the following format - \n" +
+      "{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\n" +
+      "Defaults to all existing partitions")
+      .withRequiredArg
+      .describedAs("list of partitions for which preferred replica leader election needs to be triggered")
+      .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
+      "form host:port. Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("urls")
+      .ofType(classOf[String])
+
+    options = parser.parse(args: _*)
+  }
 }
 
 class PreferredReplicaLeaderElectionCommand(zkClient: KafkaZkClient, partitionsFromUser: scala.collection.Set[TopicPartition]) {
   def moveLeaderToPreferredReplica() = {
     try {
       val topics = partitionsFromUser.map(_.topic).toSet
-      val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) =>
+      val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap { case (topic, partitions) =>
         partitions.map(new TopicPartition(topic, _))
       }.toSet
 
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 61c9643..154e351 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -19,7 +19,6 @@ package kafka.admin
 import java.util.Properties
 import java.util.concurrent.ExecutionException
 
-import joptsimple.OptionParser
 import kafka.common.AdminCommandFailedException
 import kafka.log.LogConfig
 import kafka.log.LogConfig._
@@ -29,12 +28,11 @@ import kafka.utils.json.JsonValue
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
 import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
-import org.apache.kafka.common.TopicPartitionReplica
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
 import org.apache.zookeeper.KeeperException.NodeExistsException
-import org.apache.kafka.common.TopicPartition
 
 import scala.collection.JavaConverters._
 import scala.collection._
@@ -48,6 +46,8 @@ object ReassignPartitionsCommand extends Logging {
 
   private[admin] val EarliestVersion = 1
 
+  val helpText = "This tool helps to moves topic partitions between replicas."
+
   def main(args: Array[String]): Unit = {
     val opts = validateAndParseArgs(args)
     val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
@@ -415,8 +415,7 @@ object ReassignPartitionsCommand extends Logging {
   def validateAndParseArgs(args: Array[String]): ReassignPartitionsCommandOptions = {
     val opts = new ReassignPartitionsCommandOptions(args)
 
-    if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(opts.parser, "This command moves topic partitions between replicas.")
+    CommandLineUtils.printHelpAndExitIfNeeded(opts, helpText)
 
     // Should have exactly one action
     val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _)
@@ -444,8 +443,7 @@ object ReassignPartitionsCommand extends Logging {
     opts
   }
 
-  class ReassignPartitionsCommandOptions(args: Array[String]) {
-    val parser = new OptionParser(false)
+  class ReassignPartitionsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args)  {
     val bootstrapServerOpt = parser.accepts("bootstrap-server", "the server(s) to use for bootstrapping. REQUIRED if " +
                       "an absolution path of the log directory is specified for any replica in the reassignment json file")
                       .withRequiredArg
@@ -500,7 +498,7 @@ object ReassignPartitionsCommand extends Logging {
                       .describedAs("timeout")
                       .ofType(classOf[Long])
                       .defaultsTo(10000)
-    val options = parser.parse(args : _*)
+    options = parser.parse(args : _*)
   }
 }
 
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 92cde7e..9d28452 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -21,18 +21,17 @@ import java.util.Properties
 
 import joptsimple._
 import kafka.common.AdminCommandFailedException
-import kafka.utils.Implicits._
-import kafka.utils.Whitelist
 import kafka.log.LogConfig
 import kafka.server.ConfigType
+import kafka.utils.Implicits._
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.zookeeper.KeeperException.NodeExistsException
-import org.apache.kafka.common.TopicPartition
 
 import scala.collection.JavaConverters._
 import scala.collection._
@@ -44,8 +43,7 @@ object TopicCommand extends Logging {
 
     val opts = new TopicCommandOptions(args)
 
-    if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.")
+    CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to create, delete, describe, or change a topic.")
 
     // should have exactly one action
     val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
@@ -299,8 +297,7 @@ object TopicCommand extends Logging {
     ret.toMap
   }
 
-  class TopicCommandOptions(args: Array[String]) {
-    val parser = new OptionParser(false)
+  class TopicCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
                                       "Multiple hosts can be given to allow fail-over.")
                            .withRequiredArg
@@ -311,7 +308,6 @@ object TopicCommand extends Logging {
     val deleteOpt = parser.accepts("delete", "Delete a topic")
     val alterOpt = parser.accepts("alter", "Alter the number of partitions, replica assignment, and/or configuration for the topic.")
     val describeOpt = parser.accepts("describe", "List details for the given topics.")
-    val helpOpt = parser.accepts("help", "Print usage information.")
     val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
                                            "expression except for --create option")
                          .withRequiredArg
@@ -359,7 +355,7 @@ object TopicCommand extends Logging {
 
     val excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default")
 
-    val options = parser.parse(args : _*)
+    options = parser.parse(args : _*)
 
     val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
 
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 82b7dac..a833db4 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -17,14 +17,13 @@
 
 package kafka.admin
 
-import joptsimple.OptionParser
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging, ZkUtils}
 import org.I0Itec.zkclient.exception.ZkException
-import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
-import org.apache.zookeeper.data.Stat
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.data.Stat
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
@@ -63,21 +62,9 @@ object ZkSecurityMigrator extends Logging {
 
   def run(args: Array[String]) {
     val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
-    val parser = new OptionParser(false)
-    val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
-        + " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])
-    val zkUrlOpt = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " +
-      "takes a comma-separated list of host:port pairs.").withRequiredArg().defaultsTo("localhost:2181").
-      ofType(classOf[String])
-    val zkSessionTimeoutOpt = parser.accepts("zookeeper.session.timeout", "Sets the ZooKeeper session timeout.").
-      withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
-    val zkConnectionTimeoutOpt = parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection timeout.").
-      withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
-    val helpOpt = parser.accepts("help", "Print usage information.")
+    val opts = new ZkSecurityMigratorOptions(args)
 
-    val options = parser.parse(args : _*)
-    if (options.has(helpOpt))
-      CommandLineUtils.printUsageAndDie(parser, usageMessage)
+    CommandLineUtils.printHelpAndExitIfNeeded(opts, usageMessage)
 
     if (jaasFile == null) {
      val errorMsg = "No JAAS configuration file has been specified. Please make sure that you have set " +
@@ -92,7 +79,7 @@ object ZkSecurityMigrator extends Logging {
       throw new IllegalArgumentException("Incorrect configuration") 
     }
 
-    val zkAcl: Boolean = options.valueOf(zkAclOpt) match {
+    val zkAcl: Boolean = opts.options.valueOf(opts.zkAclOpt) match {
       case "secure" =>
         info("zookeeper.acl option is secure")
         true
@@ -100,11 +87,11 @@ object ZkSecurityMigrator extends Logging {
         info("zookeeper.acl option is unsecure")
         false
       case _ =>
-        CommandLineUtils.printUsageAndDie(parser, usageMessage)
+        CommandLineUtils.printUsageAndDie(opts.parser, usageMessage)
     }
-    val zkUrl = options.valueOf(zkUrlOpt)
-    val zkSessionTimeout = options.valueOf(zkSessionTimeoutOpt).intValue
-    val zkConnectionTimeout = options.valueOf(zkConnectionTimeoutOpt).intValue
+    val zkUrl = opts.options.valueOf(opts.zkUrlOpt)
+    val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
+    val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue
     val zkUtils = ZkUtils(zkUrl, zkSessionTimeout, zkConnectionTimeout, zkAcl)
     val migrator = new ZkSecurityMigrator(zkUtils)
     migrator.run()
@@ -118,6 +105,19 @@ object ZkSecurityMigrator extends Logging {
           e.printStackTrace()
     }
   }
+
+  class ZkSecurityMigratorOptions(args: Array[String]) extends CommandDefaultOptions(args) {
+    val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
+      + " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])
+    val zkUrlOpt = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " +
+      "takes a comma-separated list of host:port pairs.").withRequiredArg().defaultsTo("localhost:2181").
+      ofType(classOf[String])
+    val zkSessionTimeoutOpt = parser.accepts("zookeeper.session.timeout", "Sets the ZooKeeper session timeout.").
+      withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
+    val zkConnectionTimeoutOpt = parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection timeout.").
+      withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
+    options = parser.parse(args : _*)
+  }
 }
 
 class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 17f340d..42c5c5b 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -27,8 +27,8 @@ import java.util.{Collections, Locale, Properties, Random}
 import com.typesafe.scalalogging.LazyLogging
 import joptsimple._
 import kafka.common.MessageFormatter
-import kafka.utils._
 import kafka.utils.Implicits._
+import kafka.utils._
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException}
@@ -189,8 +189,7 @@ object ConsoleConsumer extends Logging {
     }
   }
 
-  class ConsumerConfig(args: Array[String]) {
-    val parser = new OptionParser(false)
+  class ConsumerConfig(args: Array[String]) extends CommandDefaultOptions(args) {
     val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
       .withRequiredArg
       .describedAs("topic")
@@ -275,11 +274,11 @@ object ConsoleConsumer extends Logging {
       .describedAs("consumer group id")
       .ofType(classOf[String])
 
-    if (args.length == 0)
-      CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
+    options = tryParse(parser, args)
+
+    CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read data from Kafka topics and outputs it to standard output.")
 
     var groupIdPassed = true
-    val options: OptionSet = tryParse(parser, args)
     val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
 
     // topic must be specified.
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 9df53f5..da7d120 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -17,15 +17,14 @@
 
 package kafka.tools
 
-import kafka.common._
-import kafka.message._
-import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
-import kafka.utils.Implicits._
-import java.util.Properties
 import java.io._
 import java.nio.charset.StandardCharsets
+import java.util.Properties
 
-import joptsimple._
+import kafka.common._
+import kafka.message._
+import kafka.utils.Implicits._
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, ToolsUtils}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.KafkaException
@@ -109,8 +108,7 @@ object ConsoleProducer {
     props
   }
 
-  class ProducerConfig(args: Array[String]) {
-    val parser = new OptionParser(false)
+  class ProducerConfig(args: Array[String]) extends CommandDefaultOptions(args) {
     val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
       .withRequiredArg
       .describedAs("topic")
@@ -204,9 +202,9 @@ object ConsoleProducer {
       .describedAs("config file")
       .ofType(classOf[String])
 
-    val options = parser.parse(args : _*)
-    if (args.length == 0)
-      CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.")
+    options = parser.parse(args : _*)
+
+    CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read data from standard input and publish it to Kafka.")
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt)
 
     val topic = options.valueOf(topicOpt)
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index a065204..226341e 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -17,22 +17,20 @@
 
 package kafka.tools
 
+import java.text.SimpleDateFormat
+import java.time.Duration
 import java.util
-
-import scala.collection.JavaConverters._
 import java.util.concurrent.atomic.AtomicLong
+import java.util.{Properties, Random}
 
+import com.typesafe.scalalogging.LazyLogging
+import kafka.utils.{CommandLineUtils, ToolsUtils}
 import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
-import kafka.utils.{CommandLineUtils, ToolsUtils}
-import java.util.{Properties, Random}
-import java.text.SimpleDateFormat
-import java.time.Duration
-
-import com.typesafe.scalalogging.LazyLogging
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 /**
@@ -252,7 +250,9 @@ object ConsumerPerformance extends LazyLogging {
       .ofType(classOf[Long])
       .defaultsTo(10000)
 
-    val options = parser.parse(args: _*)
+    options = parser.parse(args: _*)
+
+    CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in performance test for the full zookeeper consumer")
 
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt, bootstrapServersOpt)
 
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index b5b0e6e..4c8c4e1 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -38,110 +38,9 @@ import scala.collection.JavaConverters._
 object DumpLogSegments {
 
   def main(args: Array[String]) {
-    val parser = new OptionParser(false)
-    val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs. Automatically set if any decoder option is specified.")
-    val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content.")
-    val indexSanityOpt = parser.accepts("index-sanity-check", "if set, just checks the index sanity without printing its content. " +
-      "This is the same check that is executed on broker startup to determine if an index needs rebuilding or not.")
-    val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped.")
-                           .withRequiredArg
-                           .describedAs("file1, file2, ...")
-                           .ofType(classOf[String])
-    val maxMessageSizeOpt = parser.accepts("max-message-size", "Size of largest message.")
-                                  .withRequiredArg
-                                  .describedAs("size")
-                                  .ofType(classOf[java.lang.Integer])
-                                  .defaultsTo(5 * 1024 * 1024)
-    val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration. Automatically set if print-data-log is enabled.")
-    val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
-                               .withOptionalArg()
-                               .ofType(classOf[java.lang.String])
-                               .defaultsTo("kafka.serializer.StringDecoder")
-    val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
-                               .withOptionalArg()
-                               .ofType(classOf[java.lang.String])
-                               .defaultsTo("kafka.serializer.StringDecoder")
-    val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as offset data from the " +
-      "__consumer_offsets topic.")
-    val transactionLogOpt = parser.accepts("transaction-log-decoder", "if set, log data will be parsed as " +
-      "transaction metadata from the __transaction_state topic.")
-
-    val helpOpt = parser.accepts("help", "Print usage information.")
-
-    val options = parser.parse(args : _*)
-
-    if(args.length == 0 || options.has(helpOpt))
-      CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
-
-    CommandLineUtils.checkRequiredArgs(parser, options, filesOpt)
-
-    val printDataLog = options.has(printOpt) ||
-      options.has(offsetsOpt) ||
-      options.has(transactionLogOpt) ||
-      options.has(valueDecoderOpt) ||
-      options.has(keyDecoderOpt)
-    val verifyOnly = options.has(verifyOpt)
-    val indexSanityOnly = options.has(indexSanityOpt)
-
-    val files = options.valueOf(filesOpt).split(",")
-    val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
-    val isDeepIteration = options.has(deepIterationOpt) || printDataLog
-
-    val messageParser = if (options.has(offsetsOpt)) {
-      new OffsetsMessageParser
-    } else if (options.has(transactionLogOpt)) {
-      new TransactionLogMessageParser
-    } else {
-      val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties)
-      val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties)
-      new DecoderMessageParser(keyDecoder, valueDecoder)
-    }
-
-    val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
-    val timeIndexDumpErrors = new TimeIndexDumpErrors
-    val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
-
-    for(arg <- files) {
-      val file = new File(arg)
-      println(s"Dumping $file")
-
-      val filename = file.getName
-      val suffix = filename.substring(filename.lastIndexOf("."))
-      suffix match {
-        case Log.LogFileSuffix =>
-          dumpLog(file, printDataLog, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , messageParser)
-        case Log.IndexFileSuffix =>
-          dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
-        case Log.TimeIndexFileSuffix =>
-          dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
-        case Log.ProducerSnapshotFileSuffix =>
-          dumpProducerIdSnapshot(file)
-        case Log.TxnIndexFileSuffix =>
-          dumpTxnIndex(file)
-        case _ =>
-          System.err.println(s"Ignoring unknown file $file")
-      }
-    }
-
-    misMatchesForIndexFilesMap.foreach {
-      case (fileName, listOfMismatches) => {
-        System.err.println("Mismatches in :" + fileName)
-        listOfMismatches.foreach(m => {
-          System.err.println("  Index offset: %d, log offset: %d".format(m._1, m._2))
-        })
-      }
-    }
-
-    timeIndexDumpErrors.printErrors()
-
-    nonConsecutivePairsForLogFilesMap.foreach {
-      case (fileName, listOfNonConsecutivePairs) => {
-        System.err.println("Non-consecutive offsets in :" + fileName)
-        listOfNonConsecutivePairs.foreach(m => {
-          System.err.println("  %d is followed by %d".format(m._1, m._2))
-        })
-      }
-    }
+    val opts = new DumpLogSegmentsOptions(args)
+    CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
+    opts.checkArgs()
   }
 
   private def dumpTxnIndex(file: File): Unit = {
@@ -495,4 +394,104 @@ object DumpLogSegments {
     }
   }
 
+  class DumpLogSegmentsOptions(args: Array[String]) extends CommandDefaultOptions(args) {
+    val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs. Automatically set if any decoder option is specified.")
+    val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content.")
+    val indexSanityOpt = parser.accepts("index-sanity-check", "if set, just checks the index sanity without printing its content. " +
+      "This is the same check that is executed on broker startup to determine if an index needs rebuilding or not.")
+    val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped.")
+      .withRequiredArg
+      .describedAs("file1, file2, ...")
+      .ofType(classOf[String])
+    val maxMessageSizeOpt = parser.accepts("max-message-size", "Size of largest message.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(5 * 1024 * 1024)
+    val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration. Automatically set if print-data-log is enabled.")
+    val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
+      .withOptionalArg()
+      .ofType(classOf[java.lang.String])
+      .defaultsTo("kafka.serializer.StringDecoder")
+    val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
+      .withOptionalArg()
+      .ofType(classOf[java.lang.String])
+      .defaultsTo("kafka.serializer.StringDecoder")
+    val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as offset data from the " +
+      "__consumer_offsets topic.")
+    val transactionLogOpt = parser.accepts("transaction-log-decoder", "if set, log data will be parsed as " +
+      "transaction metadata from the __transaction_state topic.")
+    options = parser.parse(args : _*)
+
+    def checkArgs() = {
+      CommandLineUtils.checkRequiredArgs(parser, options, filesOpt)
+
+      val printDataLog = options.has(printOpt) ||
+        options.has(offsetsOpt) ||
+        options.has(transactionLogOpt) ||
+        options.has(valueDecoderOpt) ||
+        options.has(keyDecoderOpt)
+      val verifyOnly = options.has(verifyOpt)
+      val indexSanityOnly = options.has(indexSanityOpt)
+
+      val files = options.valueOf(filesOpt).split(",")
+      val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
+      val isDeepIteration = options.has(deepIterationOpt) || printDataLog
+
+      val messageParser = if (options.has(offsetsOpt)) {
+        new OffsetsMessageParser
+      } else if (options.has(transactionLogOpt)) {
+        new TransactionLogMessageParser
+      } else {
+        val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties)
+        val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties)
+        new DecoderMessageParser(keyDecoder, valueDecoder)
+      }
+      val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
+      val timeIndexDumpErrors = new TimeIndexDumpErrors
+      val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
+
+      for(arg <- files) {
+        val file = new File(arg)
+        println(s"Dumping $file")
+
+        val filename = file.getName
+        val suffix = filename.substring(filename.lastIndexOf("."))
+        suffix match {
+          case Log.LogFileSuffix =>
+            dumpLog(file, printDataLog, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , messageParser)
+          case Log.IndexFileSuffix =>
+            dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
+          case Log.TimeIndexFileSuffix =>
+            dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
+          case Log.ProducerSnapshotFileSuffix =>
+            dumpProducerIdSnapshot(file)
+          case Log.TxnIndexFileSuffix =>
+            dumpTxnIndex(file)
+          case _ =>
+            System.err.println(s"Ignoring unknown file $file")
+        }
+      }
+
+      misMatchesForIndexFilesMap.foreach {
+        case (fileName, listOfMismatches) => {
+          System.err.println("Mismatches in :" + fileName)
+          listOfMismatches.foreach(m => {
+            System.err.println("  Index offset: %d, log offset: %d".format(m._1, m._2))
+          })
+        }
+      }
+
+      timeIndexDumpErrors.printErrors()
+
+      nonConsecutivePairsForLogFilesMap.foreach {
+        case (fileName, listOfNonConsecutivePairs) => {
+          System.err.println("Non-consecutive offsets in :" + fileName)
+          listOfNonConsecutivePairs.foreach(m => {
+            System.err.println("  %d is followed by %d".format(m._1, m._2))
+          })
+        }
+      }
+    }
+  }
 }
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index de2eba1..c685d8c 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -25,23 +25,22 @@ import java.util.regex.Pattern
 import java.util.{Collections, Properties}
 
 import com.yammer.metrics.core.Gauge
-import joptsimple.OptionParser
 import kafka.consumer.BaseConsumerRecord
 import kafka.metrics.KafkaMetricsGroup
-import kafka.utils.{CommandLineUtils, CoreUtils, Logging, Whitelist}
-import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
+import kafka.utils._
+import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
-import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
-import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.errors.{TimeoutException, WakeupException}
 import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
-import scala.util.{Failure, Success, Try}
 import scala.util.control.ControlThrowable
+import scala.util.{Failure, Success, Try}
 
 /**
  * The mirror maker has the following architecture:
@@ -86,165 +85,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     info("Starting mirror maker")
     try {
-      val parser = new OptionParser(false)
-
-      val consumerConfigOpt = parser.accepts("consumer.config",
-        "Embedded consumer config for consuming from the source cluster.")
-        .withRequiredArg()
-        .describedAs("config file")
-        .ofType(classOf[String])
-
-      parser.accepts("new.consumer",
-        "DEPRECATED Use new consumer in mirror maker (this is the default so this option will be removed in " +
-          "a future version).")
-
-      val producerConfigOpt = parser.accepts("producer.config",
-        "Embedded producer config.")
-        .withRequiredArg()
-        .describedAs("config file")
-        .ofType(classOf[String])
-
-      val numStreamsOpt = parser.accepts("num.streams",
-        "Number of consumption streams.")
-        .withRequiredArg()
-        .describedAs("Number of threads")
-        .ofType(classOf[java.lang.Integer])
-        .defaultsTo(1)
-
-      val whitelistOpt = parser.accepts("whitelist",
-        "Whitelist of topics to mirror.")
-        .withRequiredArg()
-        .describedAs("Java regex (String)")
-        .ofType(classOf[String])
-
-      val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
-        "Offset commit interval in ms.")
-        .withRequiredArg()
-        .describedAs("offset commit interval in millisecond")
-        .ofType(classOf[java.lang.Integer])
-        .defaultsTo(60000)
-
-      val consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener",
-        "The consumer rebalance listener to use for mirror maker consumer.")
-        .withRequiredArg()
-        .describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
-        .ofType(classOf[String])
-
-      val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
-        "Arguments used by custom rebalance listener for mirror maker consumer.")
-        .withRequiredArg()
-        .describedAs("Arguments passed to custom rebalance listener constructor as a string.")
-        .ofType(classOf[String])
-
-      val messageHandlerOpt = parser.accepts("message.handler",
-        "Message handler which will process every record in-between consumer and producer.")
-        .withRequiredArg()
-        .describedAs("A custom message handler of type MirrorMakerMessageHandler")
-        .ofType(classOf[String])
-
-      val messageHandlerArgsOpt = parser.accepts("message.handler.args",
-        "Arguments used by custom message handler for mirror maker.")
-        .withRequiredArg()
-        .describedAs("Arguments passed to message handler constructor.")
-        .ofType(classOf[String])
-
-      val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure",
-        "Configure the mirror maker to exit on a failed send.")
-        .withRequiredArg()
-        .describedAs("Stop the entire mirror maker when a send failure occurs")
-        .ofType(classOf[String])
-        .defaultsTo("true")
-
-      val helpOpt = parser.accepts("help", "Print this message.")
-
-      if (args.length == 0)
-        CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
-
-
-      val options = parser.parse(args: _*)
-
-      if (options.has(helpOpt)) {
-        parser.printHelpOn(System.out)
-        sys.exit(0)
-      }
-
-      CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
-      val consumerProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
-
-      if (!options.has(whitelistOpt)) {
-        error("whitelist must be specified")
-        sys.exit(1)
-      }
-
-      if (!consumerProps.containsKey(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG))
-        System.err.println("WARNING: The default partition assignment strategy of the mirror maker will " +
-          "change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If " +
-          "you prefer to make this switch in advance of that release add the following to the corresponding " +
-          "config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'")
-
-      abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
-      offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
-      val numStreams = options.valueOf(numStreamsOpt).intValue()
-
-      Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
-        override def run() {
-          cleanShutdown()
-        }
-      })
-
-      // create producer
-      val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
-      val sync = producerProps.getProperty("producer.type", "async").equals("sync")
-      producerProps.remove("producer.type")
-      // Defaults to no data loss settings.
-      maybeSetDefaultProperty(producerProps, ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Int.MaxValue.toString)
-      maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
-      maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
-      maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
-      // Always set producer key and value serializer to ByteArraySerializer.
-      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
-      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
-      producer = new MirrorMakerProducer(sync, producerProps)
-
-      // Create consumers
-      val customRebalanceListener: Option[ConsumerRebalanceListener] = {
-        val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
-        if (customRebalanceListenerClass != null) {
-          val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
-          if (rebalanceListenerArgs != null)
-            Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
-          else
-            Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
-        } else {
-          None
-        }
-      }
-      val mirrorMakerConsumers = createConsumers(
-        numStreams,
-        consumerProps,
-        customRebalanceListener,
-        Option(options.valueOf(whitelistOpt)))
-
-      // Create mirror maker threads.
-      mirrorMakerThreads = (0 until numStreams) map (i =>
-        new MirrorMakerThread(mirrorMakerConsumers(i), i))
-
-      // Create and initialize message handler
-      val customMessageHandlerClass = options.valueOf(messageHandlerOpt)
-      val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt)
-      messageHandler = {
-        if (customMessageHandlerClass != null) {
-          if (messageHandlerArgs != null)
-            CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs)
-          else
-            CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
-        } else {
-          defaultMirrorMakerMessageHandler
-        }
-      }
+      val opts = new MirrorMakerOptions(args)
+      CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to continuously copy data between two Kafka clusters.")
+      opts.checkArgs()
     } catch {
-      case ct : ControlThrowable => throw ct
-      case t : Throwable =>
+      case ct: ControlThrowable => throw ct
+      case t: Throwable =>
         error("Exception when starting mirror maker.", t)
     }
 
@@ -580,4 +426,152 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
   private class NoRecordsException extends RuntimeException
 
+  class MirrorMakerOptions(args: Array[String]) extends CommandDefaultOptions(args) {
+
+    val consumerConfigOpt = parser.accepts("consumer.config",
+      "Embedded consumer config for consuming from the source cluster.")
+      .withRequiredArg()
+      .describedAs("config file")
+      .ofType(classOf[String])
+
+    parser.accepts("new.consumer",
+      "DEPRECATED Use new consumer in mirror maker (this is the default so this option will be removed in " +
+        "a future version).")
+
+    val producerConfigOpt = parser.accepts("producer.config",
+      "Embedded producer config.")
+      .withRequiredArg()
+      .describedAs("config file")
+      .ofType(classOf[String])
+
+    val numStreamsOpt = parser.accepts("num.streams",
+      "Number of consumption streams.")
+      .withRequiredArg()
+      .describedAs("Number of threads")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+
+    val whitelistOpt = parser.accepts("whitelist",
+      "Whitelist of topics to mirror.")
+      .withRequiredArg()
+      .describedAs("Java regex (String)")
+      .ofType(classOf[String])
+
+    val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
+      "Offset commit interval in ms.")
+      .withRequiredArg()
+      .describedAs("offset commit interval in millisecond")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(60000)
+
+    val consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener",
+      "The consumer rebalance listener to use for mirror maker consumer.")
+      .withRequiredArg()
+      .describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
+      .ofType(classOf[String])
+
+    val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
+      "Arguments used by custom rebalance listener for mirror maker consumer.")
+      .withRequiredArg()
+      .describedAs("Arguments passed to custom rebalance listener constructor as a string.")
+      .ofType(classOf[String])
+
+    val messageHandlerOpt = parser.accepts("message.handler",
+      "Message handler which will process every record in-between consumer and producer.")
+      .withRequiredArg()
+      .describedAs("A custom message handler of type MirrorMakerMessageHandler")
+      .ofType(classOf[String])
+
+    val messageHandlerArgsOpt = parser.accepts("message.handler.args",
+      "Arguments used by custom message handler for mirror maker.")
+      .withRequiredArg()
+      .describedAs("Arguments passed to message handler constructor.")
+      .ofType(classOf[String])
+
+    val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure",
+      "Configure the mirror maker to exit on a failed send.")
+      .withRequiredArg()
+      .describedAs("Stop the entire mirror maker when a send failure occurs")
+      .ofType(classOf[String])
+      .defaultsTo("true")
+
+    options = parser.parse(args: _*)
+
+    def checkArgs() = {
+      CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
+      val consumerProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
+
+      if (!options.has(whitelistOpt)) {
+        error("whitelist must be specified")
+        sys.exit(1)
+      }
+
+      if (!consumerProps.containsKey(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG))
+        System.err.println("WARNING: The default partition assignment strategy of the mirror maker will " +
+          "change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If " +
+          "you prefer to make this switch in advance of that release add the following to the corresponding " +
+          "config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'")
+
+      abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
+      offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
+      val numStreams = options.valueOf(numStreamsOpt).intValue()
+
+      Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
+        override def run() {
+          cleanShutdown()
+        }
+      })
+
+      // create producer
+      val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
+      val sync = producerProps.getProperty("producer.type", "async").equals("sync")
+      producerProps.remove("producer.type")
+      // Defaults to no data loss settings.
+      maybeSetDefaultProperty(producerProps, ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Int.MaxValue.toString)
+      maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
+      maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
+      maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+      // Always set producer key and value serializer to ByteArraySerializer.
+      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
+      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
+      producer = new MirrorMakerProducer(sync, producerProps)
+
+      // Create consumers
+      val customRebalanceListener: Option[ConsumerRebalanceListener] = {
+        val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
+        if (customRebalanceListenerClass != null) {
+          val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
+          if (rebalanceListenerArgs != null)
+            Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
+          else
+            Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
+        } else {
+          None
+        }
+      }
+      val mirrorMakerConsumers = createConsumers(
+        numStreams,
+        consumerProps,
+        customRebalanceListener,
+        Option(options.valueOf(whitelistOpt)))
+
+      // Create mirror maker threads.
+      mirrorMakerThreads = (0 until numStreams) map (i =>
+        new MirrorMakerThread(mirrorMakerConsumers(i), i))
+
+      // Create and initialize message handler
+      val customMessageHandlerClass = options.valueOf(messageHandlerOpt)
+      val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt)
+      messageHandler = {
+        if (customMessageHandlerClass != null) {
+          if (messageHandlerArgs != null)
+            CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs)
+          else
+            CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
+        } else {
+          defaultMirrorMakerMessageHandler
+        }
+      }
+    }
+  }
 }
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala
index 264ae6a..836163c 100644
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ b/core/src/main/scala/kafka/tools/PerfConfig.scala
@@ -17,11 +17,10 @@
 
 package kafka.tools
 
-import joptsimple.OptionParser
+import kafka.utils.CommandDefaultOptions
 
 
-class PerfConfig(args: Array[String]) {
-  val parser = new OptionParser(false)
+class PerfConfig(args: Array[String]) extends CommandDefaultOptions(args) {
   val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send or consume")
     .withRequiredArg
     .describedAs("count")
@@ -38,5 +37,4 @@ class PerfConfig(args: Array[String]) {
     .ofType(classOf[String])
     .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS")
   val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ")
-  val helpOpt = parser.accepts("help", "Print usage.")
 }
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 967901c..40d285a 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -97,9 +97,26 @@ public class StreamsResetter {
     private static OptionSpec<String> fromFileOption;
     private static OptionSpec<Long> shiftByOption;
     private static OptionSpecBuilder dryRunOption;
+    private static OptionSpecBuilder helpOption;
     private static OptionSpecBuilder executeOption;
     private static OptionSpec<String> commandConfigOption;
 
+    private static String usage = "This tool helps to quickly reset an application in order to reprocess "
+            + "its data from scratch.\n"
+            + "* This tool resets offsets of input topics to the earliest available offset and it skips to the end of "
+            + "intermediate topics (topics used in the through() method).\n"
+            + "* This tool deletes the internal topics that were created by Kafka Streams (topics starting with "
+            + "\"<application.id>-\").\n"
+            + "You do not need to specify internal topics because the tool finds them automatically.\n"
+            + "* This tool will not delete output topics (if you want to delete them, you need to do it yourself "
+            + "with the bin/kafka-topics.sh command).\n"
+            + "* This tool will not clean up the local state on the stream application instances (the persisted "
+            + "stores used to cache aggregation results).\n"
+            + "You need to call KafkaStreams#cleanUp() in your application or manually delete them from the "
+            + "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/<application.id> by default).\n\n"
+            + "*** Important! You will get wrong output if you don't clean up the local stores after running the "
+            + "reset tool!\n\n";
+
     private OptionSet options = null;
     private final List<String> allTopics = new LinkedList<>();
 
@@ -214,12 +231,16 @@ public class StreamsResetter {
             .describedAs("file name");
         executeOption = optionParser.accepts("execute", "Execute the command.");
         dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");
+        helpOption = optionParser.accepts("help", "Print usage information.");
 
         // TODO: deprecated in 1.0; can be removed eventually: https://issues.apache.org/jira/browse/KAFKA-7606
         optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly.");
 
         try {
             options = optionParser.parse(args);
+            if (args.length == 0 || options.has(helpOption)) {
+                CommandLineUtils.printUsageAndDie(optionParser, usage);
+            }
         } catch (final OptionException e) {
             printHelp(optionParser);
             throw e;
@@ -626,23 +647,8 @@ public class StreamsResetter {
             && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
     }
 
-    private void printHelp(final OptionParser parser) throws IOException {
-        System.err.println("The Streams Reset Tool allows you to quickly reset an application in order to reprocess "
-                + "its data from scratch.\n"
-                + "* This tool resets offsets of input topics to the earliest available offset and it skips to the end of "
-                + "intermediate topics (topics used in the through() method).\n"
-                + "* This tool deletes the internal topics that were created by Kafka Streams (topics starting with "
-                + "\"<application.id>-\").\n"
-                + "You do not need to specify internal topics because the tool finds them automatically.\n"
-                + "* This tool will not delete output topics (if you want to delete them, you need to do it yourself "
-                + "with the bin/kafka-topics.sh command).\n"
-                + "* This tool will not clean up the local state on the stream application instances (the persisted "
-                + "stores used to cache aggregation results).\n"
-                + "You need to call KafkaStreams#cleanUp() in your application or manually delete them from the "
-                + "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/<application.id> by default).\n\n"
-                + "*** Important! You will get wrong output if you don't clean up the local stores after running the "
-                + "reset tool!\n\n"
-        );
+    private void printHelp(OptionParser parser) throws IOException {
+        System.err.println(usage);
         parser.printHelpOn(System.err);
     }
 
diff --git a/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala b/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala
new file mode 100644
index 0000000..096fa95
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala
@@ -0,0 +1,26 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.utils
+
+import joptsimple.{OptionParser, OptionSet}
+
+abstract class CommandDefaultOptions(val args: Array[String], allowCommandOptionAbbreviation: Boolean = false) {
+  val parser = new OptionParser(allowCommandOptionAbbreviation)
+  val helpOpt = parser.accepts("help", "Print usage information.")
+  var options: OptionSet = _
+}
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 700a137..728f033 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -16,21 +16,44 @@
  */
  package kafka.utils
 
-import joptsimple.{OptionSpec, OptionSet, OptionParser}
-import scala.collection.Set
 import java.util.Properties
 
- /**
- * Helper functions for dealing with command line utilities
- */
+import joptsimple.{OptionParser, OptionSet, OptionSpec}
+
+import scala.collection.Set
+
+/**
+  * Helper functions for dealing with command line utilities
+  */
 object CommandLineUtils extends Logging {
+  /**
+    * Check if there are no options or `--help` option from command line
+    *
+    * @param commandOpts Acceptable options for a command
+    * @return true on matching the help check condition
+    */
+  def isPrintHelpNeeded(commandOpts: CommandDefaultOptions): Boolean = {
+    return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt)
+  }
+
+  /**
+    * Check and print help message if there is no options or `--help` option
+    * from command line
+    *
+    * @param commandOpts Acceptable options for a command
+    * @param message     Message to display on successful check
+    */
+  def printHelpAndExitIfNeeded(commandOpts: CommandDefaultOptions, message: String) = {
+    if (isPrintHelpNeeded(commandOpts))
+      printUsageAndDie(commandOpts.parser, message)
+  }
 
   /**
    * Check that all the listed options are present
    */
   def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
     for (arg <- required) {
-      if(!options.has(arg))
+      if (!options.has(arg))
         printUsageAndDie(parser, "Missing required argument \"" + arg + "\"")
     }
   }
@@ -39,9 +62,9 @@ object CommandLineUtils extends Logging {
    * Check that none of the listed options are present
    */
   def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption: OptionSpec[_], invalidOptions: Set[OptionSpec[_]]) {
-    if(options.has(usedOption)) {
-      for(arg <- invalidOptions) {
-        if(options.has(arg))
+    if (options.has(usedOption)) {
+      for (arg <- invalidOptions) {
+        if (options.has(arg))
           printUsageAndDie(parser, "Option \"" + usedOption + "\" can't be used with option\"" + arg + "\"")
       }
     }
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
index a85acf9..4446773 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
@@ -93,7 +93,7 @@ class ReassignPartitionsCommandArgsTest extends JUnitSuite {
   @Test
   def shouldFailIfNoArgs(): Unit = {
     val args: Array[String]= Array()
-    shouldFailWith("This command moves topic partitions between replicas.", args)
+    shouldFailWith(ReassignPartitionsCommand.helpText, args)
   }
 
   @Test


Mime
View raw message