kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2760: Clean up interface of AdminClient.describeConsumerGroup.
Date Fri, 06 Nov 2015 21:25:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c006c5916 -> a423316d3


KAFKA-2760: Clean up interface of AdminClient.describeConsumerGroup.

…roupId)

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #442 from SinghAsDev/KAFKA-2760


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a423316d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a423316d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a423316d

Branch: refs/heads/trunk
Commit: a423316d388c386aa905b3d195487370f8703787
Parents: c006c59
Author: Ashish Singh <asingh@cloudera.com>
Authored: Fri Nov 6 13:31:38 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Nov 6 13:31:38 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/AdminClient.scala    | 38 +++++---------------
 .../kafka/admin/ConsumerGroupCommand.scala      | 14 ++++----
 .../integration/kafka/api/AdminClientTest.scala |  7 ++--
 3 files changed, 19 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a423316d/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 1c4aa52..4618127 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -147,42 +147,22 @@ class AdminClient(val time: Time,
     GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
   }
 
-  def describeConsumerGroup(groupId: String): (Map[TopicPartition, String], Map[String, List[TopicPartition]])
= {
-    val group = describeGroup(groupId)
-    try {
-      val membersAndTopicPartitions: Map[String, List[TopicPartition]] = getMembersAndTopicPartitions(group)
-      val owners = getOwners(group)
-      (owners, membersAndTopicPartitions)
-    } catch {
-      case (ex: IllegalArgumentException) =>
-        throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group.")
-    }
-  }
+  case class ConsumerSummary(
+                              memberId: String,
+                              clientId: String,
+                              clientHost: String,
+                              assignment: List[TopicPartition])
 
-  def getMembersAndTopicPartitions(group: GroupSummary): Map[String, List[TopicPartition]]
= {
+  def describeConsumerGroup(groupId: String): List[ConsumerSummary] = {
+    val group = describeGroup(groupId)
     if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
       throw new IllegalArgumentException(s"${group} is not a valid GroupSummary")
 
     group.members.map {
       case member =>
         val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
-        member.memberId -> assignment.partitions().asScala.toList
-    }.toMap
-  }
-
-  def getOwners(groupSummary: GroupSummary): Map[TopicPartition, String] = {
-    if (groupSummary.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
-      throw new IllegalArgumentException(s"${groupSummary} is not a valid GroupSummary")
-
-    groupSummary.members.flatMap {
-      case member =>
-        val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
-        val partitions = assignment.partitions().asScala.toList
-        partitions.map {
-          case partition: TopicPartition =>
-            partition -> "%s_%s".format(member.memberId, member.clientHost)
-        }.toMap
-    }.toMap
+        new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList)
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a423316d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 91dc4e3..b682812 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -106,11 +106,11 @@ object ConsumerGroupCommand {
       }
       topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs,
channelRetryBackoffMs, opts))
     } else {
-      val (owners, groupAndTopicPartitions) = createAndGetAdminClient(opts).describeConsumerGroup(group)
+      val consumers = createAndGetAdminClient(opts).describeConsumerGroup(group)
 
-      if (groupAndTopicPartitions.isEmpty)
+      if (consumers.isEmpty)
         warnNoTopicsForGroupFound
-      groupAndTopicPartitions.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs,
channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())),
owners))
+      consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs,
channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())),
Option("%s_%s".format(x.clientId, x.clientHost))))
     }
   }
 
@@ -186,12 +186,12 @@ object ConsumerGroupCommand {
     describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs,
opts, topicPartitions)
   }
 
-  def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int,
channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition],
owners: Map[TopicPartition, String] = null): Unit = {
+  def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int,
channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition],
owner: Option[String] = None): Unit = {
     val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs,
channelRetryBackoffMs)
     topicPartitions
       .sortBy { case topicPartition => topicPartition.partition }
       .foreach { topicPartition =>
-      describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition),
opts, owners)
+      describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition),
opts, owner)
     }
   }
 
@@ -241,11 +241,11 @@ object ConsumerGroupCommand {
                                 partition: Int,
                                 offsetOpt: Option[Long],
                                 opts: ConsumerGroupCommandOptions,
-                                owners: Map[TopicPartition, String] = null) {
+                                ownerOpt: Option[String] = None) {
     val topicPartition = new TopicPartition(topic, partition)
     val groupDirs = new ZKGroupTopicDirs(group, topic)
     val useNewConsumer: Boolean = opts.options.has(opts.newConsumerOpt)
-    val owner = if (useNewConsumer) owners.get(new TopicPartition(topic, partition)) else
zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
+    val owner: Option[String] = if (useNewConsumer) ownerOpt else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir
+ "/" + partition)._1
     def print(logEndOffset: Long): Unit = {
       val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
       println("%s, %s, %s, %s, %s, %s, %s"

http://git-wip-us.apache.org/repos/asf/kafka/blob/a423316d/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index 5b8cbc2..072f8eb 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -105,10 +105,9 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
       !consumers(0).assignment().isEmpty
     }, "Expected non-empty assignment")
 
-    val (_, assignment) = client.describeConsumerGroup(groupId)
-    assertEquals(1, assignment.size)
-    for (partitions <- assignment.values)
-      assertEquals(Set(tp, tp2), partitions.toSet)
+    val consumerSummaries = client.describeConsumerGroup(groupId)
+    assertEquals(1, consumerSummaries.size)
+    assertEquals(Set(tp, tp2), consumerSummaries.head.assignment.toSet)
   }
 
   @Test


Mime
View raw message