kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-3144; Report members with no assigned partitions in ConsumerGroupCommand
Date Sat, 22 Oct 2016 02:35:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 74014af83 -> 7afdad8c3


KAFKA-3144; Report members with no assigned partitions in ConsumerGroupCommand

This PR makes a couple of enhancements to the `--describe` option of `ConsumerGroupCommand`:
1. Listing members with no assigned partitions.
2. Showing the member id along with the owner of each partition (owner is supposed to be the
logical application id and all members in the same group are supposed to set the same owner).
3. Printing a warning indicating whether ZooKeeper based or new consumer API based information
is being reported.

It also adds unit tests to verify the added functionality.

Note: The third request on the corresponding JIRA (listing active offsets for empty groups
of new consumers) is not implemented as part of this PR, and has been moved to its own JIRA
(KAFKA-3853).

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #1336 from vahidhashemian/KAFKA-3144


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7afdad8c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7afdad8c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7afdad8c

Branch: refs/heads/trunk
Commit: 7afdad8c37cde4a32ad7ce66ca7a878cf730495e
Parents: 74014af
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Fri Oct 21 19:19:04 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri Oct 21 19:19:04 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/AdminClient.scala    |  56 ++--
 .../kafka/admin/ConsumerGroupCommand.scala      | 297 ++++++++++++-------
 .../scala/kafka/coordinator/GroupMetadata.scala |   6 +-
 .../main/scala/kafka/tools/StreamsResetter.java |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  17 +-
 .../integration/kafka/api/AdminClientTest.scala |  30 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala | 128 ++++++++
 .../kafka/admin/ListConsumerGroupTest.scala     |  87 ++++++
 .../integration/ResetIntegrationTest.java       |   2 +-
 9 files changed, 463 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7afdad8c/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 556a02b..22a8abb 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -17,7 +17,7 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.common.KafkaException
-import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
+import kafka.coordinator.{GroupOverview, MemberSummary}
 import kafka.utils.Logging
 import org.apache.kafka.clients._
 import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol,
RequestFuture}
@@ -121,44 +121,38 @@ class AdminClient(val time: Time,
     listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
   }
 
-  def describeGroup(groupId: String): GroupSummary = {
+  /**
+   * Case class used to represent a consumer of a consumer group
+   */
+  case class ConsumerSummary(consumerId: String,
+                             clientId: String,
+                             host: String,
+                             assignment: List[TopicPartition])
+
+  /**
+   * Case class used to represent group metadata (including the group coordinator) for the
DescribeGroup API
+   */
+  case class ConsumerGroupSummary(state: String,
+                                  assignmentStrategy: String,
+                                  consumers: Option[List[ConsumerSummary]],
+                                  coordinator: Node)
+
+  def describeConsumerGroup(groupId: String): ConsumerGroupSummary = {
     val coordinator = findCoordinator(groupId)
     val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava))
     val response = new DescribeGroupsResponse(responseBody)
-    val metadata = response.groups().get(groupId)
+    val metadata = response.groups.get(groupId)
     if (metadata == null)
       throw new KafkaException(s"Response from broker contained no metadata for group $groupId")
+    if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType
!= ConsumerProtocol.PROTOCOL_TYPE)
+      throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}'
is not a valid consumer group")
 
     Errors.forCode(metadata.errorCode()).maybeThrow()
-    val members = metadata.members().map { member =>
-      val metadata = Utils.readBytes(member.memberMetadata())
-      val assignment = Utils.readBytes(member.memberAssignment())
-      MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata,
assignment)
+    val consumers = metadata.members.map { consumer =>
+      val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment)))
+      ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, assignment.partitions.toList)
     }.toList
-    GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
-  }
-
-  case class ConsumerSummary(memberId: String,
-                             clientId: String,
-                             clientHost: String,
-                             assignment: List[TopicPartition])
-
-  def describeConsumerGroup(groupId: String): Option[List[ConsumerSummary]] = {
-    val group = describeGroup(groupId)
-    if (group.state == "Dead")
-      return None
-
-    if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
-      throw new IllegalArgumentException(s"Group $groupId with protocol type '${group.protocolType}'
is not a valid consumer group")
-
-    if (group.state == "Stable") {
-      Some(group.members.map { member =>
-        val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
-        new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList)
-      })
-    } else {
-      Some(List.empty)
-    }
+    ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
   }
 
   def close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7afdad8c/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 5de2d26..354e6a2 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -28,8 +28,8 @@ import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.BrokerNotAvailableException
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.serialization.StringDeserializer
@@ -38,7 +38,7 @@ import org.apache.kafka.common.utils.Utils
 import scala.collection.JavaConverters._
 import scala.collection.{Set, mutable}
 
-object ConsumerGroupCommand {
+object ConsumerGroupCommand extends Logging {
 
   def main(args: Array[String]) {
     val opts = new ConsumerGroupCommandOptions(args)
@@ -55,77 +55,140 @@ object ConsumerGroupCommand {
 
     val consumerGroupService = {
       if (opts.useOldConsumer) {
+        System.err.println("Note: This will only show information about consumers that use
ZooKeeper (not those using the Java consumer API).\n")
         new ZkConsumerGroupService(opts)
       } else {
+        System.err.println("Note: This will only show information about consumers that use
the Java consumer API (non-ZooKeeper-based consumers).\n")
         new KafkaConsumerGroupService(opts)
       }
     }
 
     try {
       if (opts.options.has(opts.listOpt))
-        consumerGroupService.list()
-      else if (opts.options.has(opts.describeOpt))
-        consumerGroupService.describe()
+        consumerGroupService.listGroups().foreach(println(_))
+      else if (opts.options.has(opts.describeOpt)) {
+        val (state, assignments) = consumerGroupService.describeGroup()
+        val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head
+        assignments match {
+          case None =>
+            printError(s"The consumer group '$groupId' does not exist.")
+          case Some(assignments) =>
+            if (assignments.isEmpty)
+              state match {
+                case Some("Dead") =>
+                  printError(s"Consumer group '$groupId' does not exist.")
+                case Some("Empty") =>
+                  printError(s"Consumer group '$groupId' has no active members.")
+                case Some(_) =>
+                  printError(s"Consumer group '$groupId' is rebalancing.")
+                case None =>
+                  // the control should never reach here
+                  throw new KafkaException("Expected a valid consumer group state, but none
found.")
+              }
+            else
+              printAssignment(assignments, !opts.useOldConsumer)
+        }
+      }
       else if (opts.options.has(opts.deleteOpt)) {
         consumerGroupService match {
-          case service: ZkConsumerGroupService => service.delete()
-          case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService")
+          case service: ZkConsumerGroupService => service.deleteGroups()
+          case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")
         }
       }
     } catch {
       case e: Throwable =>
-        println("Error while executing consumer group command " + e.getMessage)
-        println(Utils.stackTrace(e))
+        printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))
     } finally {
       consumerGroupService.close()
     }
   }
 
+  def printError(msg: String, e: Option[Throwable] = None): Unit = {
+    println(s"Error: $msg")
+    e.foreach(debug("Exception in consumer group command", _))
+  }
+
+  def printAssignment(groupAssignment: Seq[PartitionAssignmentState], useNewConsumer: Boolean):
Unit = {
+    print("\n%-30s %-10s %-15s %-15s %-10s %-50s".format("TOPIC", "PARTITION", "CURRENT-OFFSET",
"LOG-END-OFFSET", "LAG", "CONSUMER-ID"))
+    if (useNewConsumer)
+      print("%-30s %s".format("HOST", "CLIENT-ID"))
+    println()
+
+    groupAssignment.foreach { consumerAssignment =>
+      print("%-30s %-10s %-15s %-15s %-10s %-50s".format(
+        consumerAssignment.topic.getOrElse("-"), consumerAssignment.partition.getOrElse("-"),
+        consumerAssignment.offset.getOrElse("-"), consumerAssignment.logEndOffset.getOrElse("-"),
+        consumerAssignment.lag.getOrElse("-"), consumerAssignment.consumerId.getOrElse("-")))
+      if (useNewConsumer)
+        print("%-30s %s".format(consumerAssignment.host.getOrElse("-"), consumerAssignment.clientId.getOrElse("-")))
+      println()
+    }
+  }
+
+  protected case class PartitionAssignmentState(group: String, coordinator: Option[Node],
topic: Option[String],
+                                                partition: Option[Int], offset: Option[Long],
lag: Option[Long],
+                                                consumerId: Option[String], host: Option[String],
+                                                clientId: Option[String], logEndOffset: Option[Long])
+
   sealed trait ConsumerGroupService {
 
-    def list(): Unit
+    def listGroups(): List[String]
 
-    def describe() {
-      describeGroup(opts.options.valueOf(opts.groupOpt))
+    def describeGroup(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
+      collectGroupAssignment(opts.options.valueOf(opts.groupOpt))
     }
 
     def close(): Unit
 
     protected def opts: ConsumerGroupCommandOptions
 
-    protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult
-
-    protected def describeGroup(group: String): Unit
-
-    protected def describeTopicPartition(group: String,
-                                         topicPartitions: Seq[TopicAndPartition],
-                                         getPartitionOffset: TopicAndPartition => Option[Long],
-                                         getOwner: TopicAndPartition => Option[String]):
Unit = {
-      topicPartitions
-        .sortBy { case topicPartition => topicPartition.partition }
-        .foreach { topicPartition =>
-          describePartition(group, topicPartition.topic, topicPartition.partition, getPartitionOffset(topicPartition),
-            getOwner(topicPartition))
-        }
+    protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult
+
+    protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]])
+
+    protected def collectConsumerAssignment(group: String,
+                                            coordinator: Option[Node],
+                                            topicPartitions: Seq[TopicAndPartition],
+                                            getPartitionOffset: TopicAndPartition => Option[Long],
+                                            consumerIdOpt: Option[String],
+                                            hostOpt: Option[String],
+                                            clientIdOpt: Option[String]): Array[PartitionAssignmentState]
= {
+      if (topicPartitions.isEmpty)
+        Array[PartitionAssignmentState](
+          PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None),
consumerIdOpt, hostOpt, clientIdOpt, None)
+        )
+      else {
+        var assignmentRows: Array[PartitionAssignmentState] = Array()
+        topicPartitions
+          .sortBy(_.partition)
+          .foreach { topicPartition =>
+            assignmentRows = assignmentRows :+ describePartition(group, coordinator, topicPartition.topic,
topicPartition.partition, getPartitionOffset(topicPartition),
+              consumerIdOpt, hostOpt, clientIdOpt)
+          }
+        assignmentRows
+      }
     }
 
-    protected def printDescribeHeader() {
-      println("%-30s %-30s %-10s %-15s %-15s %-15s %s".format("GROUP", "TOPIC", "PARTITION",
"CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "OWNER"))
-    }
+    protected def getLag(offset: Option[Long], logEndOffset: Option[Long]): Option[Long]
=
+      offset.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))
 
     private def describePartition(group: String,
+                                  coordinator: Option[Node],
                                   topic: String,
                                   partition: Int,
                                   offsetOpt: Option[Long],
-                                  ownerOpt: Option[String]) {
-      def print(logEndOffset: Option[Long]): Unit = {
-        val lag = offsetOpt.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))
-        println("%-30s %-30s %-10s %-15s %-15s %-15s %s".format(group, topic, partition,
offsetOpt.getOrElse("unknown"), logEndOffset.getOrElse("unknown"), lag.getOrElse("unknown"),
ownerOpt.getOrElse("none")))
-      }
-      getLogEndOffset(topic, partition) match {
-        case LogEndOffsetResult.LogEndOffset(logEndOffset) => print(Some(logEndOffset))
-        case LogEndOffsetResult.Unknown => print(None)
-        case LogEndOffsetResult.Ignore =>
+                                  consumerIdOpt: Option[String],
+                                  hostOpt: Option[String],
+                                  clientIdOpt: Option[String]): PartitionAssignmentState
= {
+      def getDescribePartitionResult(logEndOffsetOpt: Option[Long]): PartitionAssignmentState
=
+        PartitionAssignmentState(group, coordinator, Option(topic), Option(partition), offsetOpt,
+                                 getLag(offsetOpt, logEndOffsetOpt), consumerIdOpt, hostOpt,
+                                 clientIdOpt, logEndOffsetOpt)
+
+      getLogEndOffset(new TopicPartition(topic, partition)) match {
+        case LogEndOffsetResult.LogEndOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset))
+        case LogEndOffsetResult.Unknown => getDescribePartitionResult(None)
+        case LogEndOffsetResult.Ignore => null
       }
     }
 
@@ -142,11 +205,11 @@ object ConsumerGroupCommand {
       zkUtils.close()
     }
 
-    def list() {
-      zkUtils.getConsumerGroups().foreach(println)
+    def listGroups(): List[String] = {
+      zkUtils.getConsumerGroups().toList
     }
 
-    def delete() {
+    def deleteGroups() {
       if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt))
         deleteForTopic()
       else if (opts.options.has(opts.groupOpt))
@@ -155,51 +218,72 @@ object ConsumerGroupCommand {
         deleteAllForTopic()
     }
 
-    protected def describeGroup(group: String) {
+    protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]])
= {
       val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else new Properties()
       val channelSocketTimeoutMs = props.getProperty("channelSocketTimeoutMs", "600").toInt
       val channelRetryBackoffMs = props.getProperty("channelRetryBackoffMsOpt", "300").toInt
+      if (!zkUtils.getConsumerGroups().contains(group))
+        return (None, None)
+
       val topics = zkUtils.getTopicsByConsumerGroup(group)
-      if (topics.isEmpty)
-        println("No topic available for consumer group provided")
-      printDescribeHeader()
-      topics.foreach(topic => describeTopic(group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
-    }
+      val topicPartitions = getAllTopicPartitions(topics)
+      var groupConsumerIds = zkUtils.getConsumersInGroup(group)
+
+      // mapping of topic partition -> consumer id
+      val consumerIdByTopicPartition = topicPartitions.map { topicPartition =>
+        val owner = zkUtils.readDataMaybeNull(new ZKGroupTopicDirs(group, topicPartition.topic).consumerOwnerDir
+ "/" + topicPartition.partition)._1
+        var consumerId = ""
+        owner.foreach(o => consumerId = o.substring(0, o.lastIndexOf('-')))
+        topicPartition -> consumerId
+      }.toMap
+
+      // mapping of consumer id -> list of topic partitions
+      val consumerTopicPartitions = consumerIdByTopicPartition groupBy{_._2} map {
+        case (key, value) => (key, value.unzip._1.toArray) }
+
+      // mapping of consumer id -> list of subscribed topics
+      val topicsByConsumerId = zkUtils.getTopicsPerMemberId(group)
+
+      var assignmentRows = topicPartitions.flatMap { topicPartition =>
+        val partitionOffsets = getPartitionOffsets(group, List(topicPartition), channelSocketTimeoutMs,
channelRetryBackoffMs)
+        val consumerId = consumerIdByTopicPartition.get(topicPartition)
+        // since consumer id is repeated in client id, leave host and client id empty
+        consumerId.foreach(id => groupConsumerIds = groupConsumerIds.filterNot(_ == id))
+        collectConsumerAssignment(group, None, List(topicPartition), partitionOffsets.get,
consumerId, None, None)
+      }
 
-    private def describeTopic(group: String,
-                              topic: String,
-                              channelSocketTimeoutMs: Int,
-                              channelRetryBackoffMs: Int) {
-      val topicPartitions = getTopicPartitions(topic)
-      val groupDirs = new ZKGroupTopicDirs(group, topic)
-      val ownerByTopicPartition = topicPartitions.flatMap { topicPartition =>
-        zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + topicPartition.partition)._1.map
{ owner =>
-          topicPartition -> owner
+      assignmentRows ++= groupConsumerIds.sortBy(- consumerTopicPartitions.get(_).size).flatMap
{ consumerId =>
+        topicsByConsumerId(consumerId).flatMap { topic =>
+          // since consumers with no topic partitions are processed here, we pass empty for
topic partitions and offsets
+          // since consumer id is repeated in client id, leave host and client id empty
+          collectConsumerAssignment(group, None, Array[TopicAndPartition](), Map[TopicAndPartition,
Option[Long]](), Some(consumerId), None, None)
         }
-      }.toMap
-      val partitionOffsets = getPartitionOffsets(group, topicPartitions, channelSocketTimeoutMs,
channelRetryBackoffMs)
-      describeTopicPartition(group, topicPartitions, partitionOffsets.get, ownerByTopicPartition.get)
+      }
+
+      (None, Some(assignmentRows))
     }
 
-    private def getTopicPartitions(topic: String): Seq[TopicAndPartition] = {
-      val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic))
-      val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
-      partitions.map(TopicAndPartition(topic, _))
+    private def getAllTopicPartitions(topics: Seq[String]): Seq[TopicAndPartition] = {
+      val topicPartitionMap = zkUtils.getPartitionsForTopics(topics)
+      topics.flatMap { topic =>
+        val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
+        partitions.map(TopicAndPartition(topic, _))
+      }
     }
 
-    protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
-      zkUtils.getLeaderForPartition(topic, partition) match {
+    protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult = {
+      zkUtils.getLeaderForPartition(topicPartition.topic, topicPartition.partition) match
{
         case Some(-1) => LogEndOffsetResult.Unknown
         case Some(brokerId) =>
           getZkConsumer(brokerId).map { consumer =>
-            val topicAndPartition = new TopicAndPartition(topic, partition)
+            val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition)
             val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime,
1)))
             val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
             consumer.close()
             LogEndOffsetResult.LogEndOffset(logEndOffset)
           }.getOrElse(LogEndOffsetResult.Ignore)
         case None =>
-          println(s"No broker for partition ${new TopicPartition(topic, partition)}")
+          printError(s"No broker for partition '$topicPartition'")
           LogEndOffsetResult.Ignore
       }
     }
@@ -223,15 +307,13 @@ object ConsumerGroupCommand {
             offsetMap.put(topicAndPartition, offset)
           } catch {
             case z: ZkNoNodeException =>
-              println("Could not fetch offset from zookeeper for group %s partition %s due
to missing offset data in zookeeper."
-                .format(group, topicAndPartition))
+              printError(s"Could not fetch offset from zookeeper for group '$group' partition
'$topicAndPartition' due to missing offset data in zookeeper.", Some(z))
           }
         }
         else if (offsetAndMetadata.error == Errors.NONE.code)
           offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
         else
-          println("Could not fetch offset from kafka for group %s partition %s due to %s."
-            .format(group, topicAndPartition, Errors.forCode(offsetAndMetadata.error).exception))
+          printError(s"Could not fetch offset from kafka for group '$group' partition '$topicAndPartition'
due to ${Errors.forCode(offsetAndMetadata.error).exception}.")
       }
       channel.disconnect()
       offsetMap.toMap
@@ -242,13 +324,13 @@ object ConsumerGroupCommand {
       groups.asScala.foreach { group =>
         try {
           if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group))
-            println("Deleted all consumer group information for group %s in zookeeper.".format(group))
+            println(s"Deleted all consumer group information for group '$group' in zookeeper.")
           else
-            println("Delete for group %s failed because its consumers are still active.".format(group))
+            printError(s"Delete for group '$group' failed because its consumers are still
active.")
         }
         catch {
           case e: ZkNoNodeException =>
-            println("Delete for group %s failed because group does not exist.".format(group))
+            printError(s"Delete for group '$group' failed because group does not exist.",
Some(e))
         }
       }
     }
@@ -260,13 +342,13 @@ object ConsumerGroupCommand {
       groups.asScala.foreach { group =>
         try {
           if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
-            println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group,
topic))
+            println(s"Deleted consumer group information for group '$group' topic '$topic'
in zookeeper.")
           else
-            println("Delete for group %s topic %s failed because its consumers are still
active.".format(group, topic))
+            printError(s"Delete for group '$group' topic '$topic' failed because its consumers
are still active.")
         }
         catch {
           case e: ZkNoNodeException =>
-            println("Delete for group %s topic %s failed because group does not exist.".format(group,
topic))
+            printError(s"Delete for group '$group' topic '$topic' failed because group does
not exist.", Some(e))
         }
       }
     }
@@ -275,7 +357,7 @@ object ConsumerGroupCommand {
       val topic = opts.options.valueOf(opts.topicOpt)
       Topic.validate(topic)
       AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
-      println("Deleted consumer group information for all inactive consumer groups for topic
%s in zookeeper.".format(topic))
+      println(s"Deleted consumer group information for all inactive consumer groups for topic
'$topic' in zookeeper.")
     }
 
     private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
@@ -286,7 +368,7 @@ object ConsumerGroupCommand {
           .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)))
       } catch {
         case t: Throwable =>
-          println("Could not parse broker info due to " + t.getMessage)
+          printError(s"Could not parse broker info due to ${t.getMessage}", Some(t))
           None
       }
     }
@@ -300,36 +382,39 @@ object ConsumerGroupCommand {
     // `consumer` is only needed for `describe`, so we instantiate it lazily
     private var consumer: KafkaConsumer[String, String] = null
 
-    def list() {
-      adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
+    def listGroups(): List[String] = {
+      adminClient.listAllConsumerGroupsFlattened().map(_.groupId)
     }
 
-    protected def describeGroup(group: String) {
-      adminClient.describeConsumerGroup(group) match {
-        case None => println(s"Consumer group `${group}` does not exist.")
-        case Some(consumerSummaries) =>
-          if (consumerSummaries.isEmpty)
-            println(s"Consumer group `${group}` is rebalancing.")
-          else {
-            val consumer = getConsumer()
-            printDescribeHeader()
-            consumerSummaries.foreach { consumerSummary =>
-              val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic,
tp.partition))
-              val partitionOffsets = topicPartitions.flatMap { topicPartition =>
-                Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map
{ offsetAndMetadata =>
-                  topicPartition -> offsetAndMetadata.offset
-                }
-              }.toMap
-              describeTopicPartition(group, topicPartitions, partitionOffsets.get,
-                _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
+    protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]])
= {
+      val consumerGroupSummary = adminClient.describeConsumerGroup(group)
+      (Some(consumerGroupSummary.state),
+        consumerGroupSummary.consumers match {
+          case None =>
+            None
+          case Some(consumers) =>
+            if (consumers.isEmpty)
+              Some(Array[PartitionAssignmentState]())
+            else {
+              val consumer = getConsumer()
+              Some(consumers.sortWith(_.assignment.size > _.assignment.size).flatMap {
consumerSummary =>
+                val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic,
tp.partition))
+                val partitionOffsets = topicPartitions.flatMap { topicPartition =>
+                  Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map
{ offsetAndMetadata =>
+                    topicPartition -> offsetAndMetadata.offset
+                  }
+                }.toMap
+                collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator),
topicPartitions,
+                  partitionOffsets.get, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
+                  Some(s"${consumerSummary.clientId}"))
+              })
             }
-          }
-      }
+        }
+      )
     }
 
-    protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
+    protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult = {
       val consumer = getConsumer()
-      val topicPartition = new TopicPartition(topic, partition)
       consumer.assign(List(topicPartition).asJava)
       consumer.seekToEnd(List(topicPartition).asJava)
       val logEndOffset = consumer.position(topicPartition)
@@ -431,14 +516,14 @@ object ConsumerGroupCommand {
       // check required args
       if (useOldConsumer) {
         if (options.has(bootstrapServerOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid
with $zkConnectOpt.")
+          CommandLineUtils.printUsageAndDie(parser, s"Option '$bootstrapServerOpt' is not
valid with '$zkConnectOpt'.")
         else if (options.has(newConsumerOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid
with $zkConnectOpt.")
+          CommandLineUtils.printUsageAndDie(parser, s"Option '$newConsumerOpt' is not valid
with '$zkConnectOpt'.")
       } else {
         CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
 
         if (options.has(deleteOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is only valid with
$zkConnectOpt. Note that " +
+          CommandLineUtils.printUsageAndDie(parser, s"Option '$deleteOpt' is only valid with
'$zkConnectOpt'. Note that " +
             "there's no need to delete group metadata for the new consumer as the group is
deleted when the last " +
             "committed offset for that group expires.")
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7afdad8c/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index c86c7f8..6b889f4 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -17,14 +17,12 @@
 
 package kafka.coordinator
 
-import kafka.utils.nonthreadsafe
+import collection.mutable
 import java.util.UUID
-
 import kafka.common.OffsetAndMetadata
+import kafka.utils.nonthreadsafe
 import org.apache.kafka.common.TopicPartition
 
-import collection.mutable
-
 private[coordinator] sealed trait GroupState { def state: Byte }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/7afdad8c/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 7153790..8392f66 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -90,7 +90,7 @@ public class StreamsResetter {
 
             adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
             final String groupId = this.options.valueOf(applicationIdOption);
-            if (!adminClient.describeGroup(groupId).members().isEmpty()) {
+            if (!adminClient.describeConsumerGroup(groupId).consumers().isEmpty()) {
                 throw new IllegalStateException("Consumer group '" + groupId + "' is still
active. " +
                     "Make sure to stop all running application instances before running the
reset tool.");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7afdad8c/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index e5b1b6a..80a9f1a 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -653,7 +653,7 @@ class ZkUtils(val zkClient: ZkClient,
     zkClient.exists(path)
   }
 
-  def getCluster() : Cluster = {
+  def getCluster(): Cluster = {
     val cluster = new Cluster
     val nodes = getChildrenParentMayNotExist(BrokerIdsPath)
     for (node <- nodes) {
@@ -783,7 +783,7 @@ class ZkUtils(val zkClient: ZkClient,
     getChildren(dirs.consumerRegistryDir)
   }
 
-  def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean) : mutable.Map[String,
List[ConsumerThreadId]] = {
+  def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String,
List[ConsumerThreadId]] = {
     val dirs = new ZKGroupDirs(group)
     val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
     val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]
@@ -802,6 +802,15 @@ class ZkUtils(val zkClient: ZkClient,
     consumersPerTopicMap
   }
 
+  def getTopicsPerMemberId(group: String, excludeInternalTopics: Boolean = true): Map[String,
List[String]] = {
+    val dirs = new ZKGroupDirs(group)
+    val memberIds = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
+    memberIds.map { memberId =>
+      val topicCount = TopicCount.constructTopicCount(group, memberId, this, excludeInternalTopics)
+      memberId -> topicCount.getTopicCountMap.keys.toList
+    }.toMap
+  }
+
   /**
    * This API takes in a broker id, queries zookeeper for the broker metadata and returns
the metadata for that broker
    * or throws an exception if the broker dies before the query to zookeeper finishes
@@ -891,10 +900,10 @@ class ZkUtils(val zkClient: ZkClient,
 private object ZKStringSerializer extends ZkSerializer {
 
   @throws(classOf[ZkMarshallingError])
-  def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
+  def serialize(data : Object): Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
 
   @throws(classOf[ZkMarshallingError])
-  def deserialize(bytes : Array[Byte]) : Object = {
+  def deserialize(bytes : Array[Byte]): Object = {
     if (bytes == null)
       null
     else

http://git-wip-us.apache.org/repos/asf/kafka/blob/7afdad8c/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index 891a72c..ce91a30 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -66,7 +66,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
     consumers.head.subscribe(List(topic))
     TestUtils.waitUntilTrue(() => {
       consumers.head.poll(0)
-      !consumers.head.assignment().isEmpty
+      !consumers.head.assignment.isEmpty
     }, "Expected non-empty assignment")
 
     val groups = client.listAllGroupsFlattened
@@ -77,23 +77,22 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
-  def testDescribeGroup() {
+  def testGetConsumerGroupSummary() {
     consumers.head.subscribe(List(topic))
     TestUtils.waitUntilTrue(() => {
       consumers.head.poll(0)
-      !consumers.head.assignment().isEmpty
+      !consumers.head.assignment.isEmpty
     }, "Expected non-empty assignment")
 
-    val group = client.describeGroup(groupId)
-    assertEquals("consumer", group.protocolType)
-    assertEquals("range", group.protocol)
+    val group = client.describeConsumerGroup(groupId)
+    assertEquals("range", group.assignmentStrategy)
     assertEquals("Stable", group.state)
-    assertFalse(group.members.isEmpty)
+    assertFalse(group.consumers.isEmpty)
 
-    val member = group.members.head
+    val member = group.consumers.get.head
     assertEquals(clientId, member.clientId)
-    assertFalse(member.clientHost.isEmpty)
-    assertFalse(member.memberId.isEmpty)
+    assertFalse(member.host.isEmpty)
+    assertFalse(member.consumerId.isEmpty)
   }
 
   @Test
@@ -101,17 +100,18 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
     consumers.head.subscribe(List(topic))
     TestUtils.waitUntilTrue(() => {
       consumers.head.poll(0)
-      !consumers.head.assignment().isEmpty
+      !consumers.head.assignment.isEmpty
     }, "Expected non-empty assignment")
 
-    val consumerSummaries = client.describeConsumerGroup(groupId)
-    assertEquals(1, consumerSummaries.size)
-    assertEquals(Some(Set(tp, tp2)), consumerSummaries.map(_.head.assignment.toSet))
+    val consumerGroupSummary = client.describeConsumerGroup(groupId)
+    assertEquals(1, consumerGroupSummary.consumers.get.size)
+    assertEquals(List(tp, tp2), consumerGroupSummary.consumers.get.flatMap(_.assignment))
   }
 
   @Test
   def testDescribeConsumerGroupForNonExistentGroup() {
     val nonExistentGroup = "non" + groupId
-    assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).isEmpty)
+    val sum = client.describeConsumerGroup(nonExistentGroup).consumers
+    assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7afdad8c/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
new file mode 100644
index 0000000..3691919
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.util.Properties
+
+import org.easymock.EasyMock
+import org.junit.Before
+import org.junit.Test
+
+import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
+import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
+import kafka.consumer.OldConsumer
+import kafka.consumer.Whitelist
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+
+
+class DescribeConsumerGroupTest extends KafkaServerTestHarness {
+
+  val overridingProps = new Properties()
+  val topic = "foo"
+  val topicFilter = new Whitelist(topic)
+  val group = "test.group"
+  val props = new Properties
+
+  // configure the servers and clients
+  override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown
= false).map(KafkaConfig.fromProps(_, overridingProps))
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    props.setProperty("group.id", group)
+    props.setProperty("zookeeper.connect", zkConnect)
+  }
+
+  @Test
+  def testDescribeNonExistingGroup() {
+    // mocks
+    val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter,
props).createMock()
+
+    // stubs
+    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe",
"--group", "missing.group"))
+    val consumerGroupCommand = new ZkConsumerGroupService(opts)
+
+    // simulation
+    EasyMock.replay(consumerMock)
+
+    // action/test
+    TestUtils.waitUntilTrue(() => {
+        !consumerGroupCommand.describeGroup()._2.isDefined
+      }, "Expected no rows in describe group results.")
+
+    // cleanup
+    consumerGroupCommand.close()
+    consumerMock.stop()
+  }
+
+  @Test
+  def testDescribeExistingGroup() {
+    // mocks
+    val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter,
props).createMock()
+
+    // stubs
+    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe",
"--group", group))
+    val consumerGroupCommand = new ZkConsumerGroupService(opts)
+
+    // simulation
+    EasyMock.replay(consumerMock)
+
+    // action/test
+    TestUtils.waitUntilTrue(() => {
+        val (state, assignments) = consumerGroupCommand.describeGroup()
+        assignments.isDefined &&
+        assignments.get.filter(_.group == group).size == 1 &&
+        assignments.get.filter(_.group == group).head.consumerId.isDefined
+      }, "Expected rows and a member id column in describe group results.")
+
+    // cleanup
+    consumerGroupCommand.close()
+    consumerMock.stop()
+  }
+
+  @Test
+  def testDescribeConsumersWithNoAssignedPartitions() {
+    // mocks
+    val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter,
props).createMock()
+    val consumer2Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter,
props).createMock()
+
+    // stubs
+    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe",
"--group", group))
+    val consumerGroupCommand = new ZkConsumerGroupService(opts)
+
+    EasyMock.replay(consumer1Mock)
+    EasyMock.replay(consumer2Mock)
+
+    // action/test
+    TestUtils.waitUntilTrue(() => {
+        val (state, assignments) = consumerGroupCommand.describeGroup()
+        assignments.isDefined &&
+        assignments.get.filter(_.group == group).size == 2 &&
+        assignments.get.filter{ x => x.group == group && x.partition.isDefined}.size
== 1 &&
+        assignments.get.filter{ x => x.group == group && !x.partition.isDefined}.size
== 1
+      }, "Expected rows for consumers with no assigned partitions in describe group results.")
+
+    // cleanup
+    consumerGroupCommand.close()
+    consumer1Mock.stop()
+    consumer2Mock.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7afdad8c/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
new file mode 100644
index 0000000..f4494c7
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.util.Properties
+
+import org.easymock.EasyMock
+import org.junit.Before
+import org.junit.Test
+
+import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
+import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
+import kafka.consumer.OldConsumer
+import kafka.consumer.Whitelist
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+
+
+class ListConsumerGroupTest extends KafkaServerTestHarness {
+
+  val overridingProps = new Properties()
+  val topic = "foo"
+  val topicFilter = new Whitelist(topic)
+  val group = "test.group"
+  val props = new Properties
+
+  // configure the servers and clients
+  override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown
= false).map(KafkaConfig.fromProps(_, overridingProps))
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    props.setProperty("group.id", group)
+    props.setProperty("zookeeper.connect", zkConnect)
+  }
+
+  @Test
+  def testListGroupWithNoExistingGroup() {
+    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect))
+    val consumerGroupCommand = new ZkConsumerGroupService(opts)
+    assert(consumerGroupCommand.listGroups().isEmpty)
+  }
+
+  @Test
+  def testListGroupWithSomeGroups() {
+    // mocks
+    val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter,
props).createMock()
+    props.setProperty("group.id", "some.other.group")
+    val consumer2Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter,
props).createMock()
+
+    // stubs
+    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect))
+    val consumerGroupCommand = new ZkConsumerGroupService(opts)
+
+    // simulation
+    EasyMock.replay(consumer1Mock)
+    EasyMock.replay(consumer2Mock)
+
+    // action/test
+    TestUtils.waitUntilTrue(() => {
+        val groups = consumerGroupCommand.listGroups()
+        groups.size == 2 && groups.contains(group)
+      }, "Expected a different list group results.")
+
+    // cleanup
+    consumerGroupCommand.close()
+    consumer1Mock.stop()
+    consumer2Mock.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7afdad8c/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 5847fb1..ced1109 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -300,7 +300,7 @@ public class ResetIntegrationTest {
     private class WaitUntilConsumerGroupGotClosed implements TestCondition {
         @Override
         public boolean conditionMet() {
-            return adminClient.describeGroup(APP_ID).members().isEmpty();
+            return adminClient.describeConsumerGroup(APP_ID).consumers().isEmpty();
         }
     }
 


Mime
View raw message