kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: HOTFIX: ConsoleConsumer using wrong old consumer config value for auto.offset.reset
Date Mon, 14 Aug 2017 20:52:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c0f7a7705 -> 869ef5579


HOTFIX: ConsoleConsumer using wrong old consumer config value for auto.offset.reset

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3665 from hachikuji/hotfix-auto-reset-console-consumer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/869ef557
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/869ef557
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/869ef557

Branch: refs/heads/trunk
Commit: 869ef5579f00661a5d68e92f09e088e7ed884189
Parents: c0f7a77
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Aug 14 13:52:02 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Aug 14 13:52:02 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     |  43 +++++---
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 108 ++++++++++++++++---
 2 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/869ef557/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index ad0844a..e81dba2 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -63,7 +63,9 @@ object ConsoleConsumer extends Logging {
     val consumer =
       if (conf.useOldConsumer) {
         checkZk(conf)
-        new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
+        val props = getOldConsumerProps(conf)
+        checkAndMaybeDeleteOldPath(conf, props)
+        new OldConsumer(conf.filterSpec, props)
       } else {
         val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
         if (conf.partitionArg.isDefined)
@@ -176,21 +178,26 @@ object ConsoleConsumer extends Logging {
     setAutoOffsetResetValue(config, props)
     props.put("zookeeper.connect", config.zkConnectionStr)
 
-    if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt)
&&
-      checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + props.getProperty("group.id")
+ "/offsets")) {
-      System.err.println("Found previous offset information for this group " + props.getProperty("group.id")
-        + ". Please use --delete-consumer-offsets to delete previous offsets metadata")
-      Exit.exit(1)
-    }
-
-    if (config.options.has(config.deleteConsumerOffsetsOpt))
-      ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), "/consumers/"
+ config.consumerProps.getProperty("group.id"))
     if (config.timeoutMs >= 0)
       props.put("consumer.timeout.ms", config.timeoutMs.toString)
 
     props
   }
 
+  def checkAndMaybeDeleteOldPath(config: ConsumerConfig, props: Properties) = {
+    val consumerGroupBasePath = "/consumers/" + props.getProperty("group.id")
+    if (config.options.has(config.deleteConsumerOffsetsOpt)) {
+      ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), consumerGroupBasePath)
+    } else {
+      val resetToBeginning = OffsetRequest.SmallestTimeString == props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+      if (resetToBeginning && checkZkPathExists(config.options.valueOf(config.zkConnectOpt),
consumerGroupBasePath + "/offsets")) {
+        System.err.println("Found previous offset information for this group " + props.getProperty("group.id")
+          + ". Please use --delete-consumer-offsets to delete previous offsets metadata")
+        Exit.exit(1)
+      }
+    }
+  }
+
   def getNewConsumerProps(config: ConsumerConfig): Properties = {
     val props = new Properties
 
@@ -216,19 +223,27 @@ object ConsoleConsumer extends Logging {
     * are conflicting.
     */
   def setAutoOffsetResetValue(config: ConsumerConfig, props: Properties) {
+    val (earliestConfigValue, latestConfigValue) = if (config.useOldConsumer)
+      (OffsetRequest.SmallestTimeString, OffsetRequest.LargestTimeString)
+    else
+      ("earliest", "latest")
+
     if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
       // auto.offset.reset parameter was specified on the command line
-      if (config.options.has(config.resetBeginningOpt) && "latest".equals(props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)))
{
+      val autoResetOption = props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+      if (config.options.has(config.resetBeginningOpt) && earliestConfigValue !=
autoResetOption) {
         // conflicting options - latest und earliest, throw an error
-        System.err.println("Can't simultaneously specify --from-beginning and 'auto.offset.reset=latest',
please remove one option")
+        System.err.println(s"Can't simultaneously specify --from-beginning and 'auto.offset.reset=$autoResetOption',
" +
+          "please remove one option")
         Exit.exit(1)
       }
       // nothing to do, checking for valid parameter values happens later and the specified
       // value was already copied during .putall operation
     } else {
       // no explicit value for auto.offset.reset was specified
-      // if --from-beginning was specified use "earliest", otherwise default to "latest"
-      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt))
"earliest" else "latest")
+      // if --from-beginning was specified use earliest, otherwise default to latest
+      val autoResetOption = if (config.options.has(config.resetBeginningOpt)) earliestConfigValue
else latestConfigValue
+      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoResetOption)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/869ef557/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index c188b41..9fbd3df 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -177,12 +177,12 @@ class ConsoleConsumerTest {
   }
 
   @Test
-  def shouldParseValidOldConsumerValidConfigWithAutoOffsetReset() {
+  def shouldParseValidOldConsumerConfigWithAutoOffsetResetSmallest() {
     //Given
     val args: Array[String] = Array(
       "--zookeeper", "localhost:2181",
       "--topic", "test",
-      "--consumer-property", "auto.offset.reset=earliest")
+      "--consumer-property", "auto.offset.reset=smallest")
 
     //When
     val config = new ConsoleConsumer.ConsumerConfig(args)
@@ -193,11 +193,51 @@ class ConsoleConsumerTest {
     assertEquals("localhost:2181", config.zkConnectionStr)
     assertEquals("test", config.topicArg)
     assertEquals(false, config.fromBeginning)
-    assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+    assertEquals("smallest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
   }
 
   @Test
-  def shouldParseValidNewSimpleConsumerValidConfigWithAutoOffsetReset() {
+  def shouldParseValidOldConsumerConfigWithAutoOffsetResetLargest() {
+    //Given
+    val args: Array[String] = Array(
+      "--zookeeper", "localhost:2181",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=largest")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getOldConsumerProps(config)
+
+    //Then
+    assertTrue(config.useOldConsumer)
+    assertEquals("localhost:2181", config.zkConnectionStr)
+    assertEquals("test", config.topicArg)
+    assertEquals(false, config.fromBeginning)
+    assertEquals("largest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test
+  def shouldSetAutoResetToSmallestWhenFromBeginningConfigured() {
+    //Given
+    val args = Array(
+      "--zookeeper", "localhost:2181",
+      "--topic", "test",
+      "--from-beginning")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getOldConsumerProps(config)
+
+    //Then
+    assertTrue(config.useOldConsumer)
+    assertEquals("localhost:2181", config.zkConnectionStr)
+    assertEquals("test", config.topicArg)
+    assertEquals(true, config.fromBeginning)
+    assertEquals("smallest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test
+  def shouldParseValidNewConsumerConfigWithAutoOffsetResetLatest() {
     //Given
     val args: Array[String] = Array(
       "--bootstrap-server", "localhost:9092",
@@ -217,7 +257,27 @@ class ConsoleConsumerTest {
   }
 
   @Test
-  def shouldParseValidNewSimpleConsumerValidConfigWithAutoOffsetResetAndMatchingFromBeginning()
{
+  def shouldParseValidNewConsumerConfigWithAutoOffsetResetEarliest() {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=earliest")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getNewConsumerProps(config)
+
+    //Then
+    assertFalse(config.useOldConsumer)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(false, config.fromBeginning)
+    assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test
+  def shouldParseValidNewConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning() {
     //Given
     val args: Array[String] = Array(
       "--bootstrap-server", "localhost:9092",
@@ -238,7 +298,7 @@ class ConsoleConsumerTest {
   }
 
   @Test
-  def shouldParseValidNewSimpleConsumerValidConfigWithNoOffsetReset() {
+  def shouldParseValidNewConsumerConfigWithNoOffsetReset() {
     //Given
     val args: Array[String] = Array(
       "--bootstrap-server", "localhost:9092",
@@ -257,7 +317,7 @@ class ConsoleConsumerTest {
   }
 
   @Test(expected = classOf[IllegalArgumentException])
-  def shouldStopWheValidConfigWithAutoOffsetResetAndConflictingFromBeginning() {
+  def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningNewConsumer()
{
 
     // Override exit procedure to throw an exception instead of exiting, so we can catch
the exit
     // properly for this test case
@@ -270,18 +330,38 @@ class ConsoleConsumerTest {
       "--consumer-property", "auto.offset.reset=latest",
       "--from-beginning")
 
-    // Enclose test calls in try-finally to ensure the exit procedure is
-    // reset at the end
     try {
       val config = new ConsoleConsumer.ConsumerConfig(args)
-      val consumerProperties = ConsoleConsumer.getNewConsumerProps(config)
-    } finally
-    {
+      ConsoleConsumer.getNewConsumerProps(config)
+    } finally {
+      Exit.resetExitProcedure()
+    }
+
+    fail("Expected consumer property construction to fail due to inconsistent reset options")
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningOldConsumer()
{
+
+    // Override exit procedure to throw an exception instead of exiting, so we can catch
the exit
+    // properly for this test case
+    Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
+
+    //Given
+    val args: Array[String] = Array(
+      "--zookeeper", "localhost:2181",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=largest",
+      "--from-beginning")
+
+    try {
+      val config = new ConsoleConsumer.ConsumerConfig(args)
+      ConsoleConsumer.getOldConsumerProps(config)
+    } finally {
       Exit.resetExitProcedure()
     }
 
-    // Should have thrown an exception before here, if we reach this line we can fail the
test
-    fail()
+    fail("Expected consumer property construction to fail due to inconsistent reset options")
   }
 
   @Test


Mime
View raw message