kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2734: kafka-console-consumer should allow empty topic for old consumer.
Date Fri, 06 Nov 2015 21:36:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a423316d3 -> a9e4ce79f


KAFKA-2734: kafka-console-consumer should allow empty topic for old consumer.

…ot specifying topic

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Sriharsha Chintalapani, Guozhang Wang

Closes #412 from SinghAsDev/KAFKA-2734


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9e4ce79
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9e4ce79
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9e4ce79

Branch: refs/heads/trunk
Commit: a9e4ce79fe81d3fc8c9841ac9a14db53d3102854
Parents: a423316
Author: Ashish Singh <asingh@cloudera.com>
Authored: Fri Nov 6 13:42:18 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Nov 6 13:42:18 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 23 ++++++++++++++------
 1 file changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a9e4ce79/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 639360c..9f296bd 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -239,10 +239,22 @@ object ConsoleConsumer extends Logging {
     var groupIdPassed = true
     val options: OptionSet = tryParse(parser, args)
     val useNewConsumer = options.has(useNewConsumerOpt)
-    val filterOpt = List(whitelistOpt, blacklistOpt).filter(options.has)
-    val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
-    val topicArg = options.valueOf(topicOrFilterOpt.head)
-    val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
+
+    // If using old consumer, exactly one of whitelist/blacklist/topic is required.
+    // If using new consumer, topic must be specified.
+    var topicArg: String = null
+    var filterSpec: TopicFilter = null
+    if (useNewConsumer) {
+      if (!options.has(topicIdOpt))
+        CommandLineUtils.printUsageAndDie(parser, "Topic must be specified.")
+      topicArg = options.valueOf(topicIdOpt)
+    } else {
+      val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
+      if (topicOrFilterOpt.size != 1)
+        CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic
is required.")
+      topicArg = options.valueOf(topicOrFilterOpt.head)
+      filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
+    }
     val consumerProps = if (options.has(consumerConfigOpt))
       Utils.loadProps(options.valueOf(consumerConfigOpt))
     else
@@ -262,9 +274,6 @@ object ConsoleConsumer extends Logging {
 
     CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt
else zkConnectOpt)
 
-    if (!useNewConsumer && topicOrFilterOpt.size != 1)
-      CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic
is required.")
-
     if (options.has(csvMetricsReporterEnabledOpt)) {
       val csvReporterProps = new Properties()
       csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")


Mime
View raw message