kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5588: Remove deprecated --new-consumer tools option (#5097)
Date Wed, 06 Jun 2018 03:28:38 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new be8808d  KAFKA-5588: Remove deprecated --new-consumer tools option (#5097)
be8808d is described below

commit be8808dd4b4c67add4cd3b5adbc54b263a027831
Author: Paolo Patierno <ppatierno@live.com>
AuthorDate: Wed Jun 6 05:28:03 2018 +0200

    KAFKA-5588: Remove deprecated --new-consumer tools option (#5097)
    
    Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>,
Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 10 ----
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |  9 ----
 .../scala/kafka/tools/ConsumerPerformance.scala    | 10 +---
 .../kafka/admin/DeleteConsumerGroupsTest.scala     | 10 +++-
 .../kafka/admin/DescribeConsumerGroupTest.scala    | 10 +++-
 .../unit/kafka/admin/ListConsumerGroupTest.scala   |  6 +++
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala |  8 ++++
 .../unit/kafka/tools/ConsoleConsumerTest.scala     | 51 ++++++++++++++------
 .../unit/kafka/tools/ConsumerPerformanceTest.scala | 54 +++++++++++++++++++++-
 docs/upgrade.html                                  |  8 ++++
 tests/kafkatest/services/kafka/kafka.py            | 18 ++++++--
 11 files changed, 142 insertions(+), 52 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 689a63c..44d09fd 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -889,8 +889,6 @@ object ConsumerGroupCommand extends Logging {
       "Pass in just a topic to delete the given topic's partition offsets and ownership information
" +
       "for every consumer group. For instance --topic t1" + nl +
       "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to
use it carefully to only delete groups that are not active."
-    val NewConsumerDoc = "Use the new consumer implementation. This is the default, so this
option is deprecated and " +
-      "will be removed in a future release."
     val TimeoutMsDoc = "The timeout that can be set for some use cases. For example, it can
be used when describing the group " +
       "to specify the maximum amount of time in milliseconds to wait before the group stabilizes
(when the group is just created, " +
       "or is going through some changes)."
@@ -943,7 +941,6 @@ object ConsumerGroupCommand extends Logging {
     val listOpt = parser.accepts("list", ListDoc)
     val describeOpt = parser.accepts("describe", DescribeDoc)
     val deleteOpt = parser.accepts("delete", DeleteDoc)
-    val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc)
     val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
                              .withRequiredArg
                              .describedAs("timeout (ms)")
@@ -1011,16 +1008,9 @@ object ConsumerGroupCommand extends Logging {
       if (useOldConsumer) {
         if (options.has(bootstrapServerOpt))
           CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid
with $zkConnectOpt.")
-        else if (options.has(newConsumerOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid
with $zkConnectOpt.")
       } else {
         CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
 
-        if (options.has(newConsumerOpt)) {
-          Console.err.println(s"The --new-consumer option is deprecated and will be removed
in a future major release. " +
-            s"The new consumer is used by default if the --bootstrap-server option is provided.")
-        }
-
         if (options.has(deleteOpt) && options.has(topicOpt))
           CommandLineUtils.printUsageAndDie(parser, s"When deleting a consumer group the
option $topicOpt is only " +
             s"valid with $zkConnectOpt. The new consumer does not support topic-specific
offset deletion from a consumer group.")
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index c1f8b81..b3103eb 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -324,8 +324,6 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("metrics directory")
       .ofType(classOf[java.lang.String])
-    val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.
This is the default, so " +
-      "this option is deprecated and will be removed in a future release.")
     val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED (unless old consumer
is used): The server to connect to.")
       .withRequiredArg
       .describedAs("server to connect to")
@@ -397,8 +395,6 @@ object ConsoleConsumer extends Logging {
     if (useOldConsumer) {
       if (options.has(bootstrapServerOpt))
         CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid
with $zkConnectOpt.")
-      else if (options.has(newConsumerOpt))
-        CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with
$zkConnectOpt.")
       val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
       if (topicOrFilterOpt.size != 1)
         CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic
is required.")
@@ -449,11 +445,6 @@ object ConsoleConsumer extends Logging {
 
     if (!useOldConsumer) {
       CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
-
-      if (options.has(newConsumerOpt)) {
-        Console.err.println("The --new-consumer option is deprecated and will be removed
in a future major release. " +
-          "The new consumer is used by default if the --bootstrap-server option is provided.")
-      }
     }
 
     if (options.has(csvMetricsReporterEnabledOpt)) {
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 4fff877..f4221fe 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -296,8 +296,6 @@ object ConsumerPerformance extends LazyLogging {
       .describedAs("count")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(1)
-    val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.
This is the default, so " +
-      "this option is deprecated and will be removed in a future release.")
     val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties
file.")
       .withRequiredArg
       .describedAs("config file")
@@ -325,11 +323,6 @@ object ConsumerPerformance extends LazyLogging {
     if (!useOldConsumer) {
       CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServersOpt)
 
-      if (options.has(newConsumerOpt)) {
-        Console.err.println("The --new-consumer option is deprecated and will be removed
in a future major release. " +
-          "The new consumer is used by default if the --broker-list option is provided.")
-      }
-
       import org.apache.kafka.clients.consumer.ConsumerConfig
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt))
       props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
@@ -342,8 +335,7 @@ object ConsumerPerformance extends LazyLogging {
     } else {
       if (options.has(bootstrapServersOpt))
         CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServersOpt is not valid
with $zkConnectOpt.")
-      else if (options.has(newConsumerOpt))
-        CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with
$zkConnectOpt.")
+
       CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, numMessagesOpt)
       props.put("group.id", options.valueOf(groupIdOpt))
       props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index 4cc2837..ef3b17c 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -16,6 +16,7 @@
  */
 package unit.kafka.admin
 
+import joptsimple.OptionException
 import kafka.admin.ConsumerGroupCommandTest
 import kafka.utils.TestUtils
 import org.apache.kafka.common.protocol.Errors
@@ -24,12 +25,11 @@ import org.junit.Test
 
 class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @Test(expected = classOf[joptsimple.OptionException])
+  @Test(expected = classOf[OptionException])
   def testDeleteWithTopicOption() {
     TestUtils.createOffsetsTopic(zkClient, servers)
     val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group, "--topic")
     getConsumerGroupService(cgcArgs)
-    fail("Expected an error due to presence of mutually exclusive options")
   }
 
   @Test
@@ -222,4 +222,10 @@ class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
       result.size == 1 &&
         result.keySet.contains(group) && result.get(group).contains(Errors.COORDINATOR_NOT_AVAILABLE))
   }
+
+  @Test(expected = classOf[OptionException])
+  def testDeleteWithUnrecognizedNewConsumerOption() {
+    val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--delete", "--group",
group)
+    getConsumerGroupService(cgcArgs)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 5725568..a2361b7 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -16,6 +16,7 @@
  */
 package kafka.admin
 
+import joptsimple.OptionException
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.RoundRobinAssignor
 import org.apache.kafka.common.TopicPartition
@@ -112,12 +113,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test(expected = classOf[joptsimple.OptionException])
+  @Test(expected = classOf[OptionException])
   def testDescribeWithMultipleSubActions() {
     TestUtils.createOffsetsTopic(zkClient, servers)
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group,
"--members", "--state")
     getConsumerGroupService(cgcArgs)
-    fail("Expected an error due to presence of mutually exclusive options")
   }
 
   @Test
@@ -662,6 +662,12 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
+  @Test(expected = classOf[joptsimple.OptionException])
+  def testDescribeWithUnrecognizedNewConsumerOption() {
+    val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--describe",
"--group", group)
+    getConsumerGroupService(cgcArgs)
+    fail("Expected an error due to presence of unrecognized --new-consumer option")
+  }
 
 }
 
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 13dccbe..c83e002 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -18,6 +18,7 @@ package kafka.admin
 
 import java.util.Properties
 
+import joptsimple.OptionException
 import org.junit.Test
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
 import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
@@ -86,4 +87,9 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
     }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.")
   }
 
+  @Test(expected = classOf[OptionException])
+  def testListWithUnrecognizedNewConsumerOption() {
+    val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--list")
+    getConsumerGroupService(cgcArgs)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 04fc428..116b455 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -16,6 +16,7 @@ import java.io.{BufferedWriter, File, FileWriter}
 import java.text.{ParseException, SimpleDateFormat}
 import java.util.{Calendar, Date, Properties}
 
+import joptsimple.OptionException
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
@@ -335,6 +336,13 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     adminZkClient.deleteTopic(topic)
   }
 
+  @Test(expected = classOf[OptionException])
+  def testResetWithUnrecognizedNewConsumerOption() {
+    val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--reset-offsets",
"--group", group, "--all-topics",
+      "--to-offset", "2", "--export")
+    getConsumerGroupService(cgcArgs)
+  }
+
   private def produceMessages(topic: String, numMessages: Int): Unit = {
     val records = (0 until numMessages).map(_ => new ProducerRecord[Array[Byte], Array[Byte]](topic,
       new Array[Byte](100 * 1000)))
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 6f46555..1a32bf4 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -149,8 +149,7 @@ class ConsoleConsumerTest {
     val args: Array[String] = Array(
       "--bootstrap-server", "localhost:9092",
       "--topic", "test",
-      "--from-beginning",
-      "--new-consumer") //new
+      "--from-beginning")
 
     //When
     val config = new ConsoleConsumer.ConsumerConfig(args)
@@ -169,8 +168,7 @@ class ConsoleConsumerTest {
       "--bootstrap-server", "localhost:9092",
       "--topic", "test",
       "--partition", "0",
-      "--offset", "3",
-      "--new-consumer") //new
+      "--offset", "3")
 
     //When
     val config = new ConsoleConsumer.ConsumerConfig(args)
@@ -185,6 +183,25 @@ class ConsoleConsumerTest {
 
   }
 
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldExitOnUnrecognizedNewConsumerOption(): Unit = {
+    Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
+
+    //Given
+    val args: Array[String] = Array(
+      "--new-consumer",
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--from-beginning")
+
+    //When
+    try {
+      new ConsoleConsumer.ConsumerConfig(args)
+    } finally {
+      Exit.resetExitProcedure()
+    }
+  }
+
   @Test
   def testDefaultConsumer() {
     //Given
@@ -201,6 +218,21 @@ class ConsoleConsumerTest {
   }
 
   @Test
+  def testNewConsumerRemovedOption() {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--from-beginning")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+
+    //Then
+    assertFalse(config.useOldConsumer)
+  }
+
+  @Test
   def shouldParseValidNewSimpleConsumerValidConfigWithStringOffset() {
     //Given
     val args: Array[String] = Array(
@@ -208,7 +240,6 @@ class ConsoleConsumerTest {
       "--topic", "test",
       "--partition", "0",
       "--offset", "LatEst",
-      "--new-consumer", //new
       "--property", "print.value=false")
 
     //When
@@ -366,9 +397,6 @@ class ConsoleConsumerTest {
 
   @Test(expected = classOf[IllegalArgumentException])
   def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningNewConsumer()
{
-
-    // Override exit procedure to throw an exception instead of exiting, so we can catch
the exit
-    // properly for this test case
     Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
 
     //Given
@@ -384,15 +412,10 @@ class ConsoleConsumerTest {
     } finally {
       Exit.resetExitProcedure()
     }
-
-    fail("Expected consumer property construction to fail due to inconsistent reset options")
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningOldConsumer()
{
-
-    // Override exit procedure to throw an exception instead of exiting, so we can catch
the exit
-    // properly for this test case
     Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
 
     //Given
@@ -408,8 +431,6 @@ class ConsoleConsumerTest {
     } finally {
       Exit.resetExitProcedure()
     }
-
-    fail("Expected consumer property construction to fail due to inconsistent reset options")
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
index bafe8ed..bc199f6 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
@@ -20,7 +20,8 @@ package kafka.tools
 import java.io.ByteArrayOutputStream
 import java.text.SimpleDateFormat
 
-import org.junit.Assert.assertEquals
+import joptsimple.OptionException
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.Test
 
 class ConsumerPerformanceTest {
@@ -45,6 +46,57 @@ class ConsumerPerformanceTest {
       s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0"))
   }
 
+  @Test
+  def testConfigUsingNewConsumer(): Unit = {
+    //Given
+    val args: Array[String] = Array(
+      "--broker-list", "localhost:9092",
+      "--topic", "test",
+      "--messages", "10"
+    )
+
+    //When
+    val config = new ConsumerPerformance.ConsumerPerfConfig(args)
+
+    //Then
+    assertFalse(config.useOldConsumer)
+    assertEquals("localhost:9092", config.options.valueOf(config.bootstrapServersOpt))
+    assertEquals("test", config.topic)
+    assertEquals(10, config.numMessages)
+  }
+
+  @Test
+  def testConfigUsingOldConsumer() {
+    //Given
+    val args: Array[String] = Array(
+      "--zookeeper", "localhost:2181",
+      "--topic", "test",
+      "--messages", "10")
+
+    //When
+    val config = new ConsumerPerformance.ConsumerPerfConfig(args)
+
+    //Then
+    assertTrue(config.useOldConsumer)
+    assertEquals("localhost:2181", config.options.valueOf(config.zkConnectOpt))
+    assertEquals("test", config.topic)
+    assertEquals(10, config.numMessages)
+  }
+
+  @Test(expected = classOf[OptionException])
+  def testConfigUsingNewConsumerUnrecognizedOption(): Unit = {
+    //Given
+    val args: Array[String] = Array(
+      "--broker-list", "localhost:9092",
+      "--topic", "test",
+      "--messages", "10",
+      "--new-consumer"
+    )
+
+    //When
+    new ConsumerPerformance.ConsumerPerfConfig(args)
+  }
+
   private def testHeaderMatchContent(detailed: Boolean, useOldConsumer: Boolean, expectedOutputLineCount:
Int, fun: () => Unit): Unit = {
     Console.withOut(outContent) {
       ConsumerPerformance.printHeader(detailed, useOldConsumer)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index a399a62..532c8bc 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -98,6 +98,14 @@
         will be removed in a future version.</li>
     <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code>
has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
     <li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
+    <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a>
finally removes
+        the <code>--new-consumer</code> option for all consumer based tools as
<code>kafka-console-consumer</code>, <code>kafka-consumer-perf-test</code>
+        and <code>kafka-consumer-groups</code>.
+        The new consumer is automatically used if the bootstrap servers list is provided
on the command line
+        otherwise, when the zookeeper connection is provided, the old consumer is used.
+        The <code>--new-consumer</code> option had already been ignored as the
way of selecting the consumer since Kafka 1.0.0,
+        this KIP just removes the option.
+    </li>
 </ul>
 
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol
Versions</a></h5>
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index ba5abc7..7e919b3 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -30,7 +30,7 @@ from kafkatest.services.kafka import config_property
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.security.minikdc import MiniKdc
 from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.version import DEV_BRANCH
+from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
 
 Port = collections.namedtuple('Port', ['name', 'number', 'open'])
 
@@ -584,8 +584,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             command_config = "--command-config " + command_config
 
         if new_consumer:
-            cmd = "%s --new-consumer --bootstrap-server %s %s --list" % \
+            new_consumer_opt = ""
+            if node.version <= LATEST_0_10_0:
+                new_consumer_opt = "--new-consumer"
+            cmd = "%s %s --bootstrap-server %s %s --list" % \
                   (consumer_group_script,
+                   new_consumer_opt,
                    self.bootstrap_servers(self.security_protocol),
                    command_config)
         else:
@@ -611,8 +615,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             command_config = "--command-config " + command_config
 
         if new_consumer:
-            cmd = "%s --new-consumer --bootstrap-server %s %s --group %s --describe" % \
-                  (consumer_group_script, self.bootstrap_servers(self.security_protocol),
command_config, group)
+            new_consumer_opt = ""
+            if node.version <= LATEST_0_10_0:
+                new_consumer_opt = "--new-consumer"
+            cmd = "%s %s --bootstrap-server %s %s --group %s --describe" % \
+                  (consumer_group_script,
+                   new_consumer_opt,
+                   self.bootstrap_servers(self.security_protocol),
+                   command_config, group)
         else:
             cmd = "%s --zookeeper %s %s --group %s --describe" % \
                   (consumer_group_script, self.zk_connect_setting(), command_config, group)

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.

Mime
View raw message