kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2768: AdminClient ignore member list for non-stable groups
Date Fri, 13 Nov 2015 20:50:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 002ec9c79 -> a26dbcdf3


KAFKA-2768: AdminClient ignore member list for non-stable groups

…stabilizing

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Ismael Juma, Jason Gustafson, Guozhang Wang

Closes #447 from SinghAsDev/KAFKA-2768


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a26dbcdf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a26dbcdf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a26dbcdf

Branch: refs/heads/trunk
Commit: a26dbcdf3ac57515cbc9a1c39d8c27e90f74188a
Parents: 002ec9c
Author: Ashish Singh <asingh@cloudera.com>
Authored: Fri Nov 13 12:56:15 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Nov 13 12:56:15 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminClient.scala   | 16 +++++++++-------
 .../scala/kafka/admin/ConsumerGroupCommand.scala    | 16 ++++++++--------
 2 files changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a26dbcdf/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index ff1d3fe..1dea28b 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -138,11 +138,10 @@ class AdminClient(val time: Time,
       throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
 
     Errors.forCode(metadata.errorCode()).maybeThrow()
-    val members = metadata.members().map {
-      case member =>
-        val metadata = Utils.readBytes(member.memberMetadata())
-        val assignment = Utils.readBytes(member.memberAssignment())
-        MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata,
assignment)
+    val members = metadata.members().map { member =>
+      val metadata = Utils.readBytes(member.memberMetadata())
+      val assignment = Utils.readBytes(member.memberAssignment())
+      MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata,
assignment)
     }.toList
     GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
   }
@@ -161,10 +160,13 @@ class AdminClient(val time: Time,
     if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
       throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}'
is not a valid consumer group")
 
-    group.members.map {
-      case member =>
+    if (group.state == "Stable") {
+      group.members.map { member =>
         val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
         new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList)
+      }
+    } else {
+      List.empty
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a26dbcdf/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index b682812..c29efe4 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -92,9 +92,6 @@ object ConsumerGroupCommand {
     val configs = parseConfigs(opts)
     val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
     val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
-    def warnNoTopicsForGroupFound: Unit = {
-      println("No topic available for consumer group provided")
-    }
 
     println("%s, %s, %s, %s, %s, %s, %s"
       .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
@@ -102,15 +99,18 @@ object ConsumerGroupCommand {
     if (!useNewConsumer) {
       val topics = zkUtils.getTopicsByConsumerGroup(group)
       if (topics.isEmpty) {
-        warnNoTopicsForGroupFound
+        println("No topic available for consumer group provided")
+      } else {
+        topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs,
channelRetryBackoffMs, opts))
       }
-      topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs,
channelRetryBackoffMs, opts))
     } else {
       val consumers = createAndGetAdminClient(opts).describeConsumerGroup(group)
 
-      if (consumers.isEmpty)
-        warnNoTopicsForGroupFound
-      consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs,
channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())),
Option("%s_%s".format(x.clientId, x.clientHost))))
+      if (consumers.isEmpty) {
+        println(s"Consumer group, ${group}, does not exist or is rebalancing.")
+      } else {
+        consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs,
channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())),
Option("%s_%s".format(x.clientId, x.clientHost))))
+      }
     }
   }
 


Mime
View raw message