kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2490: support new consumer in ConsumerGroupCommand
Date Fri, 06 Nov 2015 02:56:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 33e879a38 -> c3c0c04e6


KAFKA-2490: support new consumer in ConsumerGroupCommand

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Guozhang Wang, Jason Gustafson

Closes #299 from SinghAsDev/KAFKA-2490


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c3c0c04e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c3c0c04e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c3c0c04e

Branch: refs/heads/trunk
Commit: c3c0c04e62253f2a9f78b383bbf0d1a04d9b3b25
Parents: 33e879a
Author: Ashish Singh <asingh@cloudera.com>
Authored: Thu Nov 5 18:56:26 2015 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Nov 5 18:56:26 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/AdminClient.scala    |  35 +++-
 .../kafka/admin/ConsumerGroupCommand.scala      | 173 ++++++++++++++-----
 .../integration/kafka/api/AdminClientTest.scala |   2 +-
 3 files changed, 164 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c3c0c04e/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index ddd3114..1c4aa52 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -19,9 +19,9 @@ import kafka.common.KafkaException
 import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
 import kafka.utils.Logging
 import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{SendFailedException, ConsumerProtocol,
ConsumerNetworkClient, RequestFuture}
+import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol,
RequestFuture, SendFailedException}
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SaslConfigs, SslConfigs}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.errors.DisconnectException
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.Selector
@@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
-import org.apache.kafka.common.{TopicPartition, Cluster, Node}
+import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -147,10 +147,21 @@ class AdminClient(val time: Time,
     GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
   }
 
-  def describeConsumerGroup(groupId: String): Map[String, List[TopicPartition]] = {
+  def describeConsumerGroup(groupId: String): (Map[TopicPartition, String], Map[String, List[TopicPartition]])
= {
     val group = describeGroup(groupId)
+    try {
+      val membersAndTopicPartitions: Map[String, List[TopicPartition]] = getMembersAndTopicPartitions(group)
+      val owners = getOwners(group)
+      (owners, membersAndTopicPartitions)
+    } catch {
+      case (ex: IllegalArgumentException) =>
+        throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group.")
+    }
+  }
+
+  def getMembersAndTopicPartitions(group: GroupSummary): Map[String, List[TopicPartition]]
= {
     if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
-      throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group")
+      throw new IllegalArgumentException(s"${group} is not a valid GroupSummary")
 
     group.members.map {
       case member =>
@@ -159,6 +170,20 @@ class AdminClient(val time: Time,
     }.toMap
   }
 
+  def getOwners(groupSummary: GroupSummary): Map[TopicPartition, String] = {
+    if (groupSummary.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
+      throw new IllegalArgumentException(s"${groupSummary} is not a valid GroupSummary")
+
+    groupSummary.members.flatMap {
+      case member =>
+        val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
+        val partitions = assignment.partitions().asScala.toList
+        partitions.map {
+          case partition: TopicPartition =>
+            partition -> "%s_%s".format(member.memberId, member.clientHost)
+        }.toMap
+    }.toMap
+  }
 }
 
 object AdminClient {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3c0c04e/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index a30c12d..91dc4e3 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -18,21 +18,23 @@
 package kafka.admin
 
 
-import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
-import kafka.common._
 import java.util.Properties
+
+import joptsimple.{OptionParser, OptionSpec}
+import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.client.ClientUtils
-import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest}
-import org.I0Itec.zkclient.exception.ZkNoNodeException
-import kafka.common.TopicAndPartition
-import joptsimple.{OptionSpec, OptionParser}
-import scala.collection.{Set, mutable}
+import kafka.common.{TopicAndPartition, _}
 import kafka.consumer.SimpleConsumer
-import collection.JavaConversions._
-import org.apache.kafka.common.utils.Utils
+import kafka.utils._
+import org.I0Itec.zkclient.exception.ZkNoNodeException
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.utils.Utils
 
+import scala.collection.JavaConversions._
+import scala.collection.{Set, mutable}
 
 object ConsumerGroupCommand {
 
@@ -56,7 +58,7 @@ object ConsumerGroupCommand {
 
     try {
       if (opts.options.has(opts.listOpt))
-        list(zkUtils)
+        list(zkUtils, opts)
       else if (opts.options.has(opts.describeOpt))
         describe(zkUtils, opts)
       else if (opts.options.has(opts.deleteOpt))
@@ -70,20 +72,46 @@ object ConsumerGroupCommand {
     }
   }
 
-  def list(zkUtils: ZkUtils) {
-    zkUtils.getConsumerGroups().foreach(println)
+  def list(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
+    val useNewConsumer = opts.options.has(opts.newConsumerOpt)
+    if (!useNewConsumer)
+      zkUtils.getConsumerGroups().foreach(println)
+    else {
+      val adminClient = createAndGetAdminClient(opts)
+      adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
+    }
+  }
+
+  def createAndGetAdminClient(opts: ConsumerGroupCommandOptions): AdminClient = {
+    AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt))
   }
 
   def describe(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
+    val useNewConsumer = opts.options.has(opts.newConsumerOpt)
+    val group = opts.options.valueOf(opts.groupOpt)
     val configs = parseConfigs(opts)
     val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
     val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
-    val group = opts.options.valueOf(opts.groupOpt)
-    val topics = zkUtils.getTopicsByConsumerGroup(group)
-    if (topics.isEmpty) {
+    def warnNoTopicsForGroupFound: Unit = {
       println("No topic available for consumer group provided")
     }
-    topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs,
channelRetryBackoffMs))
+
+    println("%s, %s, %s, %s, %s, %s, %s"
+      .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
+
+    if (!useNewConsumer) {
+      val topics = zkUtils.getTopicsByConsumerGroup(group)
+      if (topics.isEmpty) {
+        warnNoTopicsForGroupFound
+      }
+      topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs,
channelRetryBackoffMs, opts))
+    } else {
+      val (owners, groupAndTopicPartitions) = createAndGetAdminClient(opts).describeConsumerGroup(group)
+
+      if (groupAndTopicPartitions.isEmpty)
+        warnNoTopicsForGroupFound
+      groupAndTopicPartitions.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs,
channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())),
owners))
+    }
   }
 
   def delete(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
@@ -152,15 +180,18 @@ object ConsumerGroupCommand {
                             group: String,
                             topic: String,
                             channelSocketTimeoutMs: Int,
-                            channelRetryBackoffMs: Int) {
+                            channelRetryBackoffMs: Int,
+                            opts: ConsumerGroupCommandOptions) {
     val topicPartitions = getTopicPartitions(zkUtils, topic)
+    describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs,
opts, topicPartitions)
+  }
+
+  def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int,
channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition],
owners: Map[TopicPartition, String] = null): Unit = {
     val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs,
channelRetryBackoffMs)
-    println("%s, %s, %s, %s, %s, %s, %s"
-      .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
     topicPartitions
       .sortBy { case topicPartition => topicPartition.partition }
       .foreach { topicPartition =>
-      describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition))
+      describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition),
opts, owners)
     }
   }
 
@@ -208,34 +239,84 @@ object ConsumerGroupCommand {
                                 group: String,
                                 topic: String,
                                 partition: Int,
-                                offsetOpt: Option[Long]) {
-    val topicAndPartition = TopicAndPartition(topic, partition)
+                                offsetOpt: Option[Long],
+                                opts: ConsumerGroupCommandOptions,
+                                owners: Map[TopicPartition, String] = null) {
+    val topicPartition = new TopicPartition(topic, partition)
     val groupDirs = new ZKGroupTopicDirs(group, topic)
-    val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
+    val useNewConsumer: Boolean = opts.options.has(opts.newConsumerOpt)
+    val owner = if (useNewConsumer) owners.get(new TopicPartition(topic, partition)) else
zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
+    def print(logEndOffset: Long): Unit = {
+      val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
+      println("%s, %s, %s, %s, %s, %s, %s"
+        .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"),
owner.getOrElse("none")))
+    }
     zkUtils.getLeaderForPartition(topic, partition) match {
       case Some(-1) =>
         println("%s, %s, %s, %s, %s, %s, %s"
           .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown",
owner.getOrElse("none")))
       case Some(brokerId) =>
-        val consumerOpt = getConsumer(zkUtils, brokerId)
-        consumerOpt match {
-          case Some(consumer) =>
-            val request =
-              OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime,
1)))
-            val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-            consumer.close()
-
-            val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
-            println("%s, %s, %s, %s, %s, %s, %s"
-              .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset,
lag.getOrElse("unknown"), owner.getOrElse("none")))
-          case None => // ignore
+        if (useNewConsumer) {
+          val consumerOpt = getNewConsumer(zkUtils, brokerId)
+          consumerOpt match {
+            case Some(consumer) =>
+              consumer.assign(List(topicPartition))
+              consumer.seekToEnd(topicPartition)
+              val logEndOffset = consumer.position(topicPartition)
+              consumer.close()
+              print(logEndOffset)
+            case None => // ignore
+          }
+        } else {
+          val consumerOpt = getZkConsumer(zkUtils, brokerId)
+          consumerOpt match {
+            case Some(consumer) =>
+              val topicAndPartition: TopicAndPartition = new TopicAndPartition(topicPartition.topic(),
topicPartition.partition())
+              val request =
+                OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime,
1)))
+              val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+              consumer.close()
+              print(logEndOffset)
+            case None => // ignore
+          }
         }
       case None =>
-        println("No broker for partition %s".format(topicAndPartition))
+        println("No broker for partition %s".format(topicPartition))
+    }
+  }
+
+  private def getNewConsumer(zkUtils: ZkUtils, brokerId: Int): Option[KafkaConsumer[String,
String]] = {
+    try {
+      zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
+        case Some(brokerInfoString) =>
+          Json.parseFull(brokerInfoString) match {
+            case Some(m) =>
+              val brokerInfo = m.asInstanceOf[Map[String, Any]]
+              val host = brokerInfo.get("host").get.asInstanceOf[String]
+              val port = brokerInfo.get("port").get.asInstanceOf[Int]
+              val deserializer: String = (new StringDeserializer).getClass.getName
+              val properties: Properties = new Properties()
+              properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host + ":" + port)
+              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroupCommand")
+              properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+              properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
+              properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
+              properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
+              Some(new KafkaConsumer[String, String](properties))
+            case None =>
+              throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
+          }
+        case None =>
+          throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
+      }
+    } catch {
+      case t: Throwable =>
+        println("Could not parse broker info due to " + t.getMessage)
+        None
     }
   }
 
-  private def getConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = {
+  private def getZkConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = {
     try {
       zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
         case Some(brokerInfoString) =>
@@ -261,6 +342,7 @@ object ConsumerGroupCommand {
   class ConsumerGroupCommandOptions(args: Array[String]) {
     val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the
form host:port. " +
       "Multiple URLS can be given to allow fail-over."
+    val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect
to."
     val GroupDoc = "The consumer group we wish to act on."
     val TopicDoc = "The topic whose consumer group information should be deleted."
     val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600"
@@ -273,12 +355,17 @@ object ConsumerGroupCommand {
       "information for the given consumer groups. For instance --group g1 --group g2 --topic
t1" + nl +
       "Pass in just a topic to delete the given topic's partition offsets and ownership information
" +
       "for every consumer group. For instance --topic t1" + nl +
-      "WARNING: Only does deletions on consumer groups that are not active."
+      "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to
use it carefully to only delete groups that are not active."
+    val NewConsumerDoc = "Use new consumer."
     val parser = new OptionParser
     val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
                              .withRequiredArg
                              .describedAs("urls")
                              .ofType(classOf[String])
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
+                                   .withRequiredArg
+                                   .describedAs("server to connect to")
+                                   .ofType(classOf[String])
     val groupOpt = parser.accepts("group", GroupDoc)
                          .withRequiredArg
                          .describedAs("consumer group")
@@ -294,13 +381,19 @@ object ConsumerGroupCommand {
     val listOpt = parser.accepts("list", ListDoc)
     val describeOpt = parser.accepts("describe", DescribeDoc)
     val deleteOpt = parser.accepts("delete", DeleteDoc)
+    val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc)
     val options = parser.parse(args : _*)
 
     val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
 
     def checkArgs() {
       // check required args
-      CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+      if (options.has(newConsumerOpt)) {
+        CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
+        if (options.has(deleteOpt))
+          CommandLineUtils.printUsageAndDie(parser, "Option %s does not work with %s".format(deleteOpt,
newConsumerOpt))
+      } else
+        CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
       if (options.has(describeOpt))
         CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
       if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3c0c04e/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index 7d529ec..5b8cbc2 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -105,7 +105,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
       !consumers(0).assignment().isEmpty
     }, "Expected non-empty assignment")
 
-    val assignment = client.describeConsumerGroup(groupId)
+    val (_, assignment) = client.describeConsumerGroup(groupId)
     assertEquals(1, assignment.size)
     for (partitions <- assignment.values)
       assertEquals(Set(tp, tp2), partitions.toSet)


Mime
View raw message