Repository: kafka
Updated Branches:
refs/heads/trunk a423316d3 -> a9e4ce79f
KAFKA-2734: kafka-console-consumer should allow empty topic for old consumer.
…ot specifying topic
Author: Ashish Singh <asingh@cloudera.com>
Reviewers: Sriharsha Chintalapani, Guozhang Wang
Closes #412 from SinghAsDev/KAFKA-2734
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9e4ce79
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9e4ce79
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9e4ce79
Branch: refs/heads/trunk
Commit: a9e4ce79fe81d3fc8c9841ac9a14db53d3102854
Parents: a423316
Author: Ashish Singh <asingh@cloudera.com>
Authored: Fri Nov 6 13:42:18 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Nov 6 13:42:18 2015 -0800
----------------------------------------------------------------------
.../scala/kafka/tools/ConsoleConsumer.scala | 23 ++++++++++++++------
1 file changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9e4ce79/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 639360c..9f296bd 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -239,10 +239,22 @@ object ConsoleConsumer extends Logging {
var groupIdPassed = true
val options: OptionSet = tryParse(parser, args)
val useNewConsumer = options.has(useNewConsumerOpt)
- val filterOpt = List(whitelistOpt, blacklistOpt).filter(options.has)
- val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
- val topicArg = options.valueOf(topicOrFilterOpt.head)
- val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
+
+ // If using old consumer, exactly one of whitelist/blacklist/topic is required.
+ // If using new consumer, topic must be specified.
+ var topicArg: String = null
+ var filterSpec: TopicFilter = null
+ if (useNewConsumer) {
+ if (!options.has(topicIdOpt))
+ CommandLineUtils.printUsageAndDie(parser, "Topic must be specified.")
+ topicArg = options.valueOf(topicIdOpt)
+ } else {
+ val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
+ if (topicOrFilterOpt.size != 1)
+ CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic
is required.")
+ topicArg = options.valueOf(topicOrFilterOpt.head)
+ filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
+ }
val consumerProps = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
else
@@ -262,9 +274,6 @@ object ConsoleConsumer extends Logging {
CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt
else zkConnectOpt)
- if (!useNewConsumer && topicOrFilterOpt.size != 1)
- CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic
is required.")
-
if (options.has(csvMetricsReporterEnabledOpt)) {
val csvReporterProps = new Properties()
csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")
|