kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3176: Add partition/offset options to the new consumer
Date Tue, 21 Jun 2016 05:28:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 50781b75d -> 971676881


KAFKA-3176: Add partition/offset options to the new consumer

With this pull request the new console consumer can be provided with optional --partition
and --offset arguments so only messages from a particular partition and starting from a particular
offset are consumed.

The following rules are also implemented to avoid invalid combinations of arguments:
- If --partition or --offset is provided --new-consumer has to be provided too.
- If --partition is provided --topic has to be provided too.
- If --offset is provided --partition has to be provided too.
- --offset and --from-beginning cannot be used at the same time.

This patch is co-authored with rajinisivaram.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #922 from vahidhashemian/KAFKA-3176


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/97167688
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/97167688
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/97167688

Branch: refs/heads/trunk
Commit: 9716768810635b75b7668730962c399048ca883e
Parents: 50781b7
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Mon Jun 20 22:28:27 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Jun 20 22:28:27 2016 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/BaseConsumer.scala     | 42 +++++++++---
 .../scala/kafka/tools/ConsoleConsumer.scala     | 72 ++++++++++++++++----
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 43 ++++++++++++
 3 files changed, 134 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/97167688/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 8252cfc..b39da19 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -20,10 +20,22 @@ package kafka.consumer
 import java.util.Properties
 import java.util.regex.Pattern
 
+import kafka.api.FetchRequestBuilder
+import kafka.api.OffsetRequest
+import kafka.api.Request
+import kafka.client.ClientUtils
+import kafka.cluster.BrokerEndPoint
 import kafka.common.StreamEndException
 import kafka.message.Message
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndOffset
+import kafka.utils.ToolsUtils
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.TopicPartition
 
 /**
  * A base consumer used to abstract both old and new consumer
@@ -45,21 +57,34 @@ case class BaseConsumerRecord(topic: String,
                               key: Array[Byte],
                               value: Array[Byte])
 
-class NewShinyConsumer(topic: Option[String], whitelist: Option[String], consumerProps: Properties,
val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
+class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: Option[Long],
whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue)
extends BaseConsumer {
   import org.apache.kafka.clients.consumer.KafkaConsumer
 
   import scala.collection.JavaConversions._
 
   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
-  if (topic.isDefined)
-    consumer.subscribe(List(topic.get))
-  else if (whitelist.isDefined)
-    consumer.subscribe(Pattern.compile(whitelist.get), new NoOpConsumerRebalanceListener())
-  else
-    throw new IllegalArgumentException("Exactly one of topic or whitelist has to be provided.")
-
+  consumerInit()
   var recordIter = consumer.poll(0).iterator
 
+  def consumerInit() {
+    if (topic.isDefined)
+      if (partitionId.isDefined) {
+        val topicPartition = new TopicPartition(topic.get, partitionId.get)
+        consumer.assign(List(topicPartition))
+        offset.get match {
+          case OffsetRequest.EarliestTime => consumer.seekToBeginning(List(topicPartition))
+          case OffsetRequest.LatestTime => consumer.seekToEnd(List(topicPartition))
+          case _ => consumer.seek(topicPartition, offset.get)
+        }
+      }
+      else
+        consumer.subscribe(List(topic.get))
+    else if (whitelist.isDefined)
+      consumer.subscribe(Pattern.compile(whitelist.get), new NoOpConsumerRebalanceListener())
+    else
+      throw new IllegalArgumentException("Exactly one of topic or whitelist has to be provided.")
+  }
+
   override def receive(): BaseConsumerRecord = {
     if (!recordIter.hasNext) {
       recordIter = consumer.poll(timeoutMs).iterator
@@ -124,4 +149,3 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties)
extends B
     this.consumerConnector.commitOffsets
   }
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/97167688/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index c1b5aee..0b6502a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch
 import java.util.{Properties, Random}
 
 import joptsimple._
+import kafka.api.OffsetRequest
 import kafka.common.{MessageFormatter, StreamEndException}
 import kafka.consumer._
 import kafka.message._
@@ -61,7 +62,10 @@ object ConsoleConsumer extends Logging {
     val consumer =
       if (conf.useNewConsumer) {
         val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
-        new NewShinyConsumer(Option(conf.topicArg), Option(conf.whitelistArg), getNewConsumerProps(conf),
timeoutMs)
+        if (conf.partitionArg.isDefined)
+          new NewShinyConsumer(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg),
None, getNewConsumerProps(conf), timeoutMs)
+        else
+          new NewShinyConsumer(Option(conf.topicArg), None, None, Option(conf.whitelistArg),
getNewConsumerProps(conf), timeoutMs)
       } else {
         checkZk(conf)
         new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
@@ -213,6 +217,15 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("blacklist")
       .ofType(classOf[String])
+    val partitionIdOpt = parser.accepts("partition", "The partition to consume from.")
+      .withRequiredArg
+      .describedAs("partition")
+      .ofType(classOf[java.lang.Integer])
+    val offsetOpt = parser.accepts("offset", "The offset id to consume from (a non-negative
number), or 'earliest' which means from beginning, or 'latest' which means from end")
+      .withRequiredArg
+      .describedAs("consume offset")
+      .ofType(classOf[String])
+      .defaultsTo("earliest")
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the form host:port. " +
       "Multiple URLS can be given to allow fail-over.")
       .withRequiredArg
@@ -283,20 +296,9 @@ object ConsoleConsumer extends Logging {
     // If using new consumer, topic must be specified.
     var topicArg: String = null
     var whitelistArg: String = null
+    var partitionArg: Option[Int] = None
+    var offsetArg: Long = OffsetRequest.LatestTime
     var filterSpec: TopicFilter = null
-    if (useNewConsumer) {
-      val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
-      if (topicOrFilterOpt.size != 1)
-        CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.")
-      topicArg = options.valueOf(topicIdOpt)
-      whitelistArg = options.valueOf(whitelistOpt)
-    } else {
-      val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
-      if (topicOrFilterOpt.size != 1)
-        CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic
is required.")
-      topicArg = options.valueOf(topicOrFilterOpt.head)
-      filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
-    }
     val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
     val consumerProps = if (options.has(consumerConfigOpt))
       Utils.loadProps(options.valueOf(consumerConfigOpt))
@@ -304,6 +306,7 @@ object ConsoleConsumer extends Logging {
       new Properties()
     val zkConnectionStr = options.valueOf(zkConnectOpt)
     val fromBeginning = options.has(resetBeginningOpt)
+    partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue)
else None
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
@@ -315,6 +318,47 @@ object ConsoleConsumer extends Logging {
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
 
+    if (useNewConsumer) {
+      val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
+      if (topicOrFilterOpt.size != 1)
+        CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.")
+      topicArg = options.valueOf(topicIdOpt)
+      whitelistArg = options.valueOf(whitelistOpt)
+    } else {
+      val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
+      if (topicOrFilterOpt.size != 1)
+        CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic
is required.")
+      topicArg = options.valueOf(topicOrFilterOpt.head)
+      filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
+    }
+    
+    if (partitionArg.isDefined) {
+      if (!useNewConsumer)
+        CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is
supported in the new consumer only.")
+      if (!options.has(topicIdOpt))
+        CommandLineUtils.printUsageAndDie(parser, "The topic is required when partition is
specified.")
+      if (fromBeginning && options.has(offsetOpt))
+        CommandLineUtils.printUsageAndDie(parser, "Options from-beginning and offset cannot
be specified together.")
+      if (options.has(offsetOpt) &&
+          !(options.valueOf(offsetOpt).toLowerCase().equals("earliest") ||
+            options.valueOf(offsetOpt).toLowerCase().equals("latest") ||
+            (options.valueOf(offsetOpt) forall Character.isDigit)))
+        CommandLineUtils.printUsageAndDie(parser, "The provided offset value is incorrect.
Valid values are 'earliest', 'latest', or non-negative numbers.")
+    } else if (options.has(offsetOpt)) {
+      if (!useNewConsumer)
+        CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is
supported in the new consumer only.")
+      else
+        CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset
is specified.")
+    }
+
+    offsetArg = if (options.has(offsetOpt)) {
+      options.valueOf(offsetOpt).toLowerCase() match {
+        case "earliest" => OffsetRequest.EarliestTime
+        case "latest" => OffsetRequest.LatestTime
+        case _ => options.valueOf(offsetOpt).toLong
+      }
+    } else if (fromBeginning) OffsetRequest.EarliestTime else OffsetRequest.LatestTime
+
     CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt
else zkConnectOpt)
 
     if (options.has(csvMetricsReporterEnabledOpt)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/97167688/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index c3ebade..63be9c4 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -114,6 +114,49 @@ class ConsoleConsumerTest extends JUnitSuite {
     assertEquals(true, config.fromBeginning)
   }
 
+  @Test
+  def shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset() {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--partition", "0",
+      "--offset", "3",
+      "--new-consumer") //new
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+
+    //Then
+    assertTrue(config.useNewConsumer)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(0, config.partitionArg.get)
+    assertEquals(3, config.offsetArg)
+    assertEquals(false, config.fromBeginning)
+  }
+
+  @Test
+  def shouldParseValidNewSimpleConsumerValidConfigWithStringOffset() {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--partition", "0",
+      "--offset", "LatEst",
+      "--new-consumer") //new
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+
+    //Then
+    assertTrue(config.useNewConsumer)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(0, config.partitionArg.get)
+    assertEquals(-1, config.offsetArg)
+    assertEquals(false, config.fromBeginning)
+  }
 
   @Test
   def shouldParseConfigsFromFile() {


Mime
View raw message