kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5629; ConsoleConsumer should respect auto.offset.reset if specified on the command line
Date Wed, 09 Aug 2017 20:22:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c35c47981 -> 1ad74f5b7


KAFKA-5629; ConsoleConsumer should respect auto.offset.reset if specified on the command line

when "auto.offset.reset" property is specified on the command line but overridden by the code
during startup. Currently the ConsoleConsumer silently overrides that setting, which can create
confusing behavior.

Author: Soenke Liebau <soenke.liebau@opencore.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3566 from soenkeliebau/KAFKA-5629


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ad74f5b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ad74f5b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ad74f5b

Branch: refs/heads/trunk
Commit: 1ad74f5b760d16f3990a5024e717f4826278675d
Parents: c35c479
Author: Soenke Liebau <soenke.liebau@opencore.com>
Authored: Wed Aug 9 13:22:05 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Wed Aug 9 13:22:05 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     |  32 +++++-
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 111 ++++++++++++++++++-
 2 files changed, 140 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1ad74f5b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index f86b28b..ad0844a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -173,7 +173,7 @@ object ConsoleConsumer extends Logging {
 
     props.putAll(config.consumerProps)
     props.putAll(config.extraConsumerProps)
-    props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
+    setAutoOffsetResetValue(config, props)
     props.put("zookeeper.connect", config.zkConnectionStr)
 
     if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt)
&&
@@ -196,7 +196,7 @@ object ConsoleConsumer extends Logging {
 
     props.putAll(config.consumerProps)
     props.putAll(config.extraConsumerProps)
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt))
"earliest" else "latest")
+    setAutoOffsetResetValue(config, props)
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
@@ -204,6 +204,34 @@ object ConsoleConsumer extends Logging {
     props
   }
 
+  /**
+    * Used by both getNewConsumerProps and getOldConsumerProps to retrieve the correct value
for the
+    * consumer parameter 'auto.offset.reset'.
+    * Order of priority is:
+    *   1. Explicitly set parameter via --consumer.property command line parameter
+    *   2. Explicit --from-beginning given -> 'earliest'
+    *   3. Default value of 'latest'
+    *
+    * In case both --from-beginning and an explicit value are specified an error is thrown
if these
+    * are conflicting.
+    */
+  def setAutoOffsetResetValue(config: ConsumerConfig, props: Properties) {
+    if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
+      // auto.offset.reset parameter was specified on the command line
+      if (config.options.has(config.resetBeginningOpt) && "latest".equals(props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)))
{
+        // conflicting options - latest und earliest, throw an error
+        System.err.println("Can't simultaneously specify --from-beginning and 'auto.offset.reset=latest',
please remove one option")
+        Exit.exit(1)
+      }
+      // nothing to do, checking for valid parameter values happens later and the specified
+      // value was already copied during .putall operation
+    } else {
+      // no explicit value for auto.offset.reset was specified
+      // if --from-beginning was specified use "earliest", otherwise default to "latest"
+      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt))
"earliest" else "latest")
+    }
+  }
+
   class ConsumerConfig(args: Array[String]) {
     val parser = new OptionParser(false)
     val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ad74f5b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index e0917a2..c188b41 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -21,7 +21,8 @@ import java.io.{PrintStream, FileOutputStream}
 
 import kafka.common.MessageFormatter
 import kafka.consumer.{BaseConsumer, BaseConsumerRecord}
-import kafka.utils.TestUtils
+import kafka.utils.{Exit, TestUtils}
+import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
@@ -176,6 +177,114 @@ class ConsoleConsumerTest {
   }
 
   @Test
+  def shouldParseValidOldConsumerValidConfigWithAutoOffsetReset() {
+    //Given
+    val args: Array[String] = Array(
+      "--zookeeper", "localhost:2181",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=earliest")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getOldConsumerProps(config)
+
+    //Then
+    assertTrue(config.useOldConsumer)
+    assertEquals("localhost:2181", config.zkConnectionStr)
+    assertEquals("test", config.topicArg)
+    assertEquals(false, config.fromBeginning)
+    assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test
+  def shouldParseValidNewSimpleConsumerValidConfigWithAutoOffsetReset() {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=latest")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getNewConsumerProps(config)
+
+    //Then
+    assertFalse(config.useOldConsumer)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(false, config.fromBeginning)
+    assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test
+  def shouldParseValidNewSimpleConsumerValidConfigWithAutoOffsetResetAndMatchingFromBeginning()
{
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=earliest",
+      "--from-beginning")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getNewConsumerProps(config)
+
+    //Then
+    assertFalse(config.useOldConsumer)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(true, config.fromBeginning)
+    assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test
+  def shouldParseValidNewSimpleConsumerValidConfigWithNoOffsetReset() {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getNewConsumerProps(config)
+
+    //Then
+    assertFalse(config.useOldConsumer)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(false, config.fromBeginning)
+    assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldStopWheValidConfigWithAutoOffsetResetAndConflictingFromBeginning() {
+
+    // Override exit procedure to throw an exception instead of exiting, so we can catch
the exit
+    // properly for this test case
+    Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
+
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=latest",
+      "--from-beginning")
+
+    // Enclose test calls in try-finally to ensure the exit procedure is
+    // reset at the end
+    try {
+      val config = new ConsoleConsumer.ConsumerConfig(args)
+      val consumerProperties = ConsoleConsumer.getNewConsumerProps(config)
+    } finally
+    {
+      Exit.resetExitProcedure()
+    }
+
+    // Should have thrown an exception before here, if we reach this line we can fail the
test
+    fail()
+  }
+
+  @Test
   def shouldParseConfigsFromFile() {
     val propsFile = TestUtils.tempFile()
     val propsStream = new FileOutputStream(propsFile)


Mime
View raw message