kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7471: Multiple Consumer Group Management Feature (#5726)
Date Mon, 15 Apr 2019 23:53:40 GMT
This is an automated email from the ASF dual-hosted git repository.

vahid pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 47a9871  KAFKA-7471: Multiple Consumer Group Management Feature (#5726)
47a9871 is described below

commit 47a9871ef65a13f9d58d5ea216de340f7e123da5
Author: Alex Dunayevsky <rootex-@users.noreply.github.com>
AuthorDate: Tue Apr 16 02:53:28 2019 +0300

    KAFKA-7471: Multiple Consumer Group Management Feature (#5726)
    
    * Describe/Delete/Reset offsets on multiple consumer groups at a time (including each group by repeating `--group` parameter)
    * Describe/Delete/Reset offsets on ALL consumer groups at a time (add new `--all-groups` option similar to `--all-topics`)
    * Reset plan CSV file generation reworked: structure updated to support multiple consumer groups and make sure that CSV file generation is done properly since there are no restrictions on consumer group names and symbols like commas and quotes are allowed.
    * Extending data output table format by adding `GROUP` column for all `--describe` queries
---
 build.gradle                                       |   2 +
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 665 +++++++++++++--------
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 .../kafka/admin/DeleteConsumerGroupsTest.scala     |  66 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala    | 132 ++--
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 220 +++++--
 gradle/dependencies.gradle                         |   2 +
 7 files changed, 723 insertions(+), 366 deletions(-)

diff --git a/build.gradle b/build.gradle
index ff316c8..c6ab5f7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -638,6 +638,8 @@ project(':core') {
   dependencies {
     compile project(':clients')
     compile libs.jacksonDatabind
+    compile libs.jacksonModuleScala
+    compile libs.jacksonDataformatCsv
     compile libs.jacksonJDK8Datatypes
     compile libs.joptSimple
     compile libs.metrics
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index b40a33b..1ca3515 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -19,10 +19,11 @@ package kafka.admin
 
 import java.text.{ParseException, SimpleDateFormat}
 import java.time.{Duration, Instant}
-import java.util
 import java.util.Properties
 
-import joptsimple.OptionSpec
+import com.fasterxml.jackson.dataformat.csv.CsvMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
 import kafka.utils._
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
@@ -33,8 +34,11 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
-import scala.collection.{Seq, Set}
+import scala.collection.{Seq, Set, mutable}
 import scala.util.{Failure, Success, Try}
+import joptsimple.OptionSpec
+import scala.collection.immutable.TreeMap
+import scala.reflect.ClassTag
 
 object ConsumerGroupCommand extends Logging {
 
@@ -56,13 +60,13 @@ object ConsumerGroupCommand extends Logging {
       if (opts.options.has(opts.listOpt))
         consumerGroupService.listGroups().foreach(println(_))
       else if (opts.options.has(opts.describeOpt))
-        consumerGroupService.describeGroup()
+        consumerGroupService.describeGroups()
       else if (opts.options.has(opts.deleteOpt))
         consumerGroupService.deleteGroups()
       else if (opts.options.has(opts.resetOffsetsOpt)) {
         val offsetsToReset = consumerGroupService.resetOffsets()
         if (opts.options.has(opts.exportOpt)) {
-          val exported = consumerGroupService.exportOffsetsToReset(offsetsToReset)
+          val exported = consumerGroupService.exportOffsetsToCsv(offsetsToReset)
           println(exported)
         } else
           printOffsetsToReset(offsetsToReset)
@@ -78,7 +82,7 @@ object ConsumerGroupCommand extends Logging {
   val MISSING_COLUMN_VALUE = "-"
 
   def printError(msg: String, e: Option[Throwable] = None): Unit = {
-    println(s"Error: $msg")
+    println(s"\nError: $msg")
     e.foreach(_.printStackTrace())
   }
 
@@ -95,15 +99,18 @@ object ConsumerGroupCommand extends Logging {
     date.getTime
   }
 
-  def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): Unit = {
-    println("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
-
-    groupAssignmentsToReset.foreach {
-      case (consumerAssignment, offsetAndMetadata) =>
-        println("%-30s %-10s %-15s".format(
-          consumerAssignment.topic,
-          consumerAssignment.partition,
-          offsetAndMetadata.offset))
+  def printOffsetsToReset(groupAssignmentsToReset: Map[String, Map[TopicPartition, OffsetAndMetadata]]): Unit = {
+    if (groupAssignmentsToReset.nonEmpty)
+      println("\n%-30s %-30s %-10s %-15s".format("GROUP", "TOPIC", "PARTITION", "NEW-OFFSET"))
+    for {
+      (groupId, assignment) <- groupAssignmentsToReset
+      (consumerAssignment, offsetAndMetadata) <- assignment
+    } {
+      println("%-30s %-30s %-10s %-15s".format(
+        groupId,
+        consumerAssignment.topic,
+        consumerAssignment.partition,
+        offsetAndMetadata.offset))
     }
   }
 
@@ -117,19 +124,61 @@ object ConsumerGroupCommand extends Logging {
 
   private[admin] case class GroupState(group: String, coordinator: Node, assignmentStrategy: String, state: String, numMembers: Int)
 
+  private[admin] sealed trait CsvRecord
+  private[admin] case class CsvRecordWithGroup(group: String, topic: String, partition: Int, offset: Long) extends CsvRecord
+  private[admin] case class CsvRecordNoGroup(topic: String, partition: Int, offset: Long) extends CsvRecord
+  private[admin] object CsvRecordWithGroup {
+    val fields = Array("group", "topic", "partition", "offset")
+  }
+  private[admin] object CsvRecordNoGroup {
+    val fields = Array("topic", "partition", "offset")
+  }
+  // Example: CsvUtils().readerFor[CsvRecordWithoutGroup]
+  private[admin] case class CsvUtils() {
+    val mapper = new CsvMapper with ScalaObjectMapper
+    mapper.registerModule(DefaultScalaModule)
+    def readerFor[T <: CsvRecord: ClassTag] = {
+      val schema = getSchema[T]
+      val clazz = implicitly[ClassTag[T]].runtimeClass
+      mapper.readerFor(clazz).`with`(schema)
+    }
+    def writerFor[T <: CsvRecord: ClassTag] = {
+      val schema = getSchema[T]
+      val clazz = implicitly[ClassTag[T]].runtimeClass
+      mapper.writerFor(clazz).`with`(schema)
+    }
+    private def getSchema[T <: CsvRecord: ClassTag] = {
+      val clazz = implicitly[ClassTag[T]].runtimeClass
+      val fields = clazz match {
+        case _ if classOf[CsvRecordWithGroup] == clazz => CsvRecordWithGroup.fields
+        case _ if classOf[CsvRecordNoGroup]   == clazz => CsvRecordNoGroup.fields
+      }
+      val schema = mapper.schemaFor(clazz).sortedBy(fields: _*)
+      schema
+    }
+  }
+
   class ConsumerGroupService(val opts: ConsumerGroupCommandOptions) {
 
     private val adminClient = createAdminClient()
 
-    // `consumer` is only needed for `describe`, so we instantiate it lazily
-    private var consumer: KafkaConsumer[String, String] = _
+    // `consumers` are only needed for `describe`, so we instantiate them lazily
+    private lazy val consumers: mutable.Map[String, KafkaConsumer[String, String]] = mutable.Map.empty
 
-    def listGroups(): List[String] = {
-      val result = adminClient.listConsumerGroups(
-        withTimeoutMs(new ListConsumerGroupsOptions))
+    // We have to make sure it is evaluated once and available
+    private lazy val resetPlanFromFile: Option[Map[String, Map[TopicPartition, OffsetAndMetadata]]] = {
+      if (opts.options.has(opts.resetFromFileOpt)) {
+        val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
+        val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
+        val resetPlan = parseResetPlan(resetPlanCsv)
+        Some(resetPlan)
+      } else None
+    }
 
-        val listings = result.all.get.asScala
-        listings.map(_.groupId).toList
+    def listGroups(): List[String] = {
+      val result = adminClient.listConsumerGroups(withTimeoutMs(new ListConsumerGroupsOptions))
+      val listings = result.all.get.asScala
+      listings.map(_.groupId).toList
     }
 
     private def shouldPrintMemberState(group: String, state: Option[String], numRows: Option[Int]): Boolean = {
@@ -143,9 +192,9 @@ object ConsumerGroupCommand extends Logging {
           case Some("Dead") =>
             printError(s"Consumer group '$group' does not exist.")
           case Some("Empty") =>
-            Console.err.println(s"Consumer group '$group' has no active members.")
+            Console.err.println(s"\nConsumer group '$group' has no active members.")
           case Some("PreparingRebalance") | Some("CompletingRebalance") =>
-            Console.err.println(s"Warning: Consumer group '$group' is rebalancing.")
+            Console.err.println(s"\nWarning: Consumer group '$group' is rebalancing.")
           case Some("Stable") =>
           case other =>
             // the control should never reach here
@@ -157,113 +206,127 @@ object ConsumerGroupCommand extends Logging {
 
     private def size(colOpt: Option[Seq[Object]]): Option[Int] = colOpt.map(_.size)
 
-    private def printOffsets(group: String, state: Option[String], assignments: Option[Seq[PartitionAssignmentState]]): Unit = {
-      if (shouldPrintMemberState(group, state, size(assignments))) {
-        // find proper columns width
-        var (maxTopicLen, maxConsumerIdLen, maxHostLen) = (15, 15, 15)
-        assignments match {
-          case None => // do nothing
-          case Some(consumerAssignments) =>
-            consumerAssignments.foreach { consumerAssignment =>
-              maxTopicLen = Math.max(maxTopicLen, consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE).length)
-              maxConsumerIdLen = Math.max(maxConsumerIdLen, consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE).length)
-              maxHostLen = Math.max(maxHostLen, consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE).length)
-            }
-        }
+    private def printOffsets(offsets: Map[String, (Option[String], Option[Seq[PartitionAssignmentState]])]): Unit = {
+      for ((groupId, (state, assignments)) <- offsets) {
+        if (shouldPrintMemberState(groupId, state, size(assignments))) {
+          // find proper columns width
+          var (maxGroupLen, maxTopicLen, maxConsumerIdLen, maxHostLen) = (15, 15, 15, 15)
+          assignments match {
+            case None => // do nothing
+            case Some(consumerAssignments) =>
+              consumerAssignments.foreach { consumerAssignment =>
+                maxGroupLen = Math.max(maxGroupLen, consumerAssignment.group.length)
+                maxTopicLen = Math.max(maxTopicLen, consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE).length)
+                maxConsumerIdLen = Math.max(maxConsumerIdLen, consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE).length)
+                maxHostLen = Math.max(maxHostLen, consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE).length)
+              }
+          }
 
-        println(s"\n%${-maxTopicLen}s %-10s %-15s %-15s %-15s %${-maxConsumerIdLen}s %${-maxHostLen}s %s"
-          .format("TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID", "HOST", "CLIENT-ID"))
-
-        assignments match {
-          case None => // do nothing
-          case Some(consumerAssignments) =>
-            consumerAssignments.foreach { consumerAssignment =>
-              println(s"%-${maxTopicLen}s %-10s %-15s %-15s %-15s %-${maxConsumerIdLen}s %-${maxHostLen}s %s".format(
-                consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE),
-                consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
-                consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE),
-                consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE)))
-            }
+          println(s"\n%${-maxGroupLen}s %${-maxTopicLen}s %-10s %-15s %-15s %-15s %${-maxConsumerIdLen}s %${-maxHostLen}s %s"
+            .format("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID", "HOST", "CLIENT-ID"))
+
+          assignments match {
+            case None => // do nothing
+            case Some(consumerAssignments) =>
+              consumerAssignments.foreach { consumerAssignment =>
+                println(s"%${-maxGroupLen}s %${-maxTopicLen}s %-10s %-15s %-15s %-15s %${-maxConsumerIdLen}s %${-maxHostLen}s %s".format(
+                  consumerAssignment.group,
+                  consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE),
+                  consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
+                  consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE),
+                  consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE))
+                )
+              }
+          }
         }
       }
     }
 
-    private def printMembers(group: String, state: Option[String], assignments: Option[Seq[MemberAssignmentState]], verbose: Boolean): Unit = {
-      if (shouldPrintMemberState(group, state, size(assignments))) {
-        // find proper columns width
-        var (maxConsumerIdLen, maxHostLen, maxClientIdLen) = (15, 15, 15)
-        assignments match {
-          case None => // do nothing
-          case Some(memberAssignments) =>
-            memberAssignments.foreach { memberAssignment =>
-              maxConsumerIdLen = Math.max(maxConsumerIdLen, memberAssignment.consumerId.length)
-              maxHostLen = Math.max(maxHostLen, memberAssignment.host.length)
-              maxClientIdLen = Math.max(maxClientIdLen, memberAssignment.clientId.length)
-            }
-        }
+    private def printMembers(members: Map[String, (Option[String], Option[Seq[MemberAssignmentState]])], verbose: Boolean): Unit = {
+      for ((groupId, (state, assignments)) <- members) {
+        if (shouldPrintMemberState(groupId, state, size(assignments))) {
+          // find proper columns width
+          var (maxGroupLen, maxConsumerIdLen, maxHostLen, maxClientIdLen) = (15, 15, 15, 15)
+          assignments match {
+            case None => // do nothing
+            case Some(memberAssignments) =>
+              memberAssignments.foreach { memberAssignment =>
+                maxGroupLen = Math.max(maxGroupLen, memberAssignment.group.length)
+                maxConsumerIdLen = Math.max(maxConsumerIdLen, memberAssignment.consumerId.length)
+                maxHostLen = Math.max(maxHostLen, memberAssignment.host.length)
+                maxClientIdLen = Math.max(maxClientIdLen, memberAssignment.clientId.length)
+              }
+          }
 
-        print(s"\n%${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s %-15s "
-          .format("CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS"))
-        if (verbose)
-          print("%s".format("ASSIGNMENT"))
-        println()
-
-        assignments match {
-          case None => // do nothing
-          case Some(memberAssignments) =>
-            memberAssignments.foreach { memberAssignment =>
-              print(s"%${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s %-15s ".format(
-                memberAssignment.consumerId, memberAssignment.host, memberAssignment.clientId, memberAssignment.numPartitions))
-              if (verbose) {
-                val partitions = memberAssignment.assignment match {
-                  case List() => MISSING_COLUMN_VALUE
-                  case assignment =>
-                    assignment.groupBy(_.topic).map {
-                      case (topic, partitionList) => topic + partitionList.map(_.partition).sorted.mkString("(", ",", ")")
-                    }.toList.sorted.mkString(", ")
+          print(s"\n%${-maxGroupLen}s %${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s %-15s "
+            .format("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS"))
+          if (verbose)
+            print(s"%s".format("ASSIGNMENT"))
+          println()
+
+          assignments match {
+            case None => // do nothing
+            case Some(memberAssignments) =>
+              memberAssignments.foreach { memberAssignment =>
+                print(s"%${-maxGroupLen}s %${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s %-15s ".format(
+                  memberAssignment.group, memberAssignment.consumerId, memberAssignment.host, memberAssignment.clientId, memberAssignment.numPartitions))
+                if (verbose) {
+                  val partitions = memberAssignment.assignment match {
+                    case List() => MISSING_COLUMN_VALUE
+                    case assignment =>
+                      assignment.groupBy(_.topic).map {
+                        case (topic, partitionList) => topic + partitionList.map(_.partition).sorted.mkString("(", ",", ")")
+                      }.toList.sorted.mkString(", ")
+                  }
+                  print(s"%s".format(partitions))
                 }
-                print("%s".format(partitions))
+                println()
               }
-              println()
-            }
+          }
         }
       }
     }
 
-    private def printState(group: String, state: GroupState): Unit = {
-      if (shouldPrintMemberState(group, Some(state.state), Some(1))) {
-        val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
-        val coordinatorColLen = Math.max(25, coordinator.length)
-        print(s"\n%${-coordinatorColLen}s %-25s %-20s %s".format("COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
-        print(s"\n%${-coordinatorColLen}s %-25s %-20s %s".format(coordinator, state.assignmentStrategy, state.state, state.numMembers))
-        println()
+    private def printStates(states: Map[String, GroupState]): Unit = {
+      for ((groupId, state) <- states) {
+        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+          val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
+          val coordinatorColLen = Math.max(25, coordinator.length)
+          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
+          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format(state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers))
+          println()
+        }
       }
     }
 
-    def describeGroup(): Unit = {
-      val group = opts.options.valuesOf(opts.groupOpt).asScala.head
+    def describeGroups(): Unit = {
+      val groupIds =
+        if (opts.options.has(opts.allGroupsOpt)) listGroups()
+        else opts.options.valuesOf(opts.groupOpt).asScala
       val membersOptPresent = opts.options.has(opts.membersOpt)
       val stateOptPresent = opts.options.has(opts.stateOpt)
       val offsetsOptPresent = opts.options.has(opts.offsetsOpt)
       val subActions = Seq(membersOptPresent, offsetsOptPresent, stateOptPresent).count(_ == true)
 
       if (subActions == 0 || offsetsOptPresent) {
-        val offsets = collectGroupOffsets()
-        printOffsets(group, offsets._1, offsets._2)
+        val offsets = collectGroupsOffsets(groupIds)
+        printOffsets(offsets)
       } else if (membersOptPresent) {
-        val members = collectGroupMembers(opts.options.has(opts.verboseOpt))
-        printMembers(group, members._1, members._2, opts.options.has(opts.verboseOpt))
-      } else
-        printState(group, collectGroupState())
+        val members = collectGroupsMembers(groupIds, opts.options.has(opts.verboseOpt))
+        printMembers(members, opts.options.has(opts.verboseOpt))
+      } else {
+        val states = collectGroupsState(groupIds)
+        printStates(states)
+      }
     }
 
     private def collectConsumerAssignment(group: String,
-                                            coordinator: Option[Node],
-                                            topicPartitions: Seq[TopicPartition],
-                                            getPartitionOffset: TopicPartition => Option[Long],
-                                            consumerIdOpt: Option[String],
-                                            hostOpt: Option[String],
-                                            clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {
+                                          coordinator: Option[Node],
+                                          topicPartitions: Seq[TopicPartition],
+                                          getPartitionOffset: TopicPartition => Option[Long],
+                                          consumerIdOpt: Option[String],
+                                          hostOpt: Option[String],
+                                          clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {
       if (topicPartitions.isEmpty) {
         Array[PartitionAssignmentState](
           PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None)
@@ -290,109 +353,142 @@ object ConsumerGroupCommand extends Logging {
           getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt)
       }
 
-      getLogEndOffsets(topicPartitions).map {
-        case (topicPartition, LogOffsetResult.LogOffset(offset)) => getDescribePartitionResult(topicPartition, Some(offset))
-        case (topicPartition, _) => getDescribePartitionResult(topicPartition, None)
+      getLogEndOffsets(group, topicPartitions).map {
+        logEndOffsetResult =>
+          logEndOffsetResult._2 match {
+            case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(logEndOffsetResult._1, Some(logEndOffset))
+            case LogOffsetResult.Unknown => getDescribePartitionResult(logEndOffsetResult._1, None)
+            case LogOffsetResult.Ignore => null
+          }
       }.toArray
     }
 
-    def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = {
-      val groupId = opts.options.valueOf(opts.groupOpt)
+    def resetOffsets(): Map[String, Map[TopicPartition, OffsetAndMetadata]] = {
+      val groupIds =
+        if (opts.options.has(opts.allGroupsOpt)) listGroups()
+        else opts.options.valuesOf(opts.groupOpt).asScala
+
       val consumerGroups = adminClient.describeConsumerGroups(
-        util.Arrays.asList(groupId),
+        groupIds.asJava,
         withTimeoutMs(new DescribeConsumerGroupsOptions)
       ).describedGroups()
 
-      val group = consumerGroups.get(groupId).get
-      group.state.toString match {
-        case "Empty" | "Dead" =>
-          val partitionsToReset = getPartitionsToReset(groupId)
-          val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
-
-          // Dry-run is the default behavior if --execute is not specified
-          val dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt)
-          if (!dryRun)
-            getConsumer.commitSync(preparedOffsets.asJava)
-          preparedOffsets
-        case currentState =>
-          printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.")
-          Map.empty
+      val result =
+        consumerGroups.asScala.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) {
+          case (acc, (groupId, groupDescription)) =>
+            groupDescription.get.state().toString match {
+              case "Empty" | "Dead" =>
+                val partitionsToReset = getPartitionsToReset(groupId)
+                val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
+
+                // Dry-run is the default behavior if --execute is not specified
+                val dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt)
+                if (!dryRun)
+                  getConsumer(groupId).commitSync(preparedOffsets.asJava)
+                acc.updated(groupId, preparedOffsets)
+              case currentState =>
+                printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.")
+                acc.updated(groupId, Map.empty)
+            }
+        }
+      result
+    }
+
+    private[admin] def describeConsumerGroups(groupIds: Seq[String]): mutable.Map[String, ConsumerGroupDescription] = {
+      adminClient.describeConsumerGroups(
+        groupIds.asJava,
+        withTimeoutMs(new DescribeConsumerGroupsOptions)
+      ).describedGroups().asScala.map {
+        case (groupId, groupDescriptionFuture) => (groupId, groupDescriptionFuture.get())
       }
     }
 
     /**
       * Returns the state of the specified consumer group and partition assignment states
       */
-    def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
-      val groupId = opts.options.valueOf(opts.groupOpt)
-      val consumerGroup = adminClient.describeConsumerGroups(
-        List(groupId).asJava,
-        withTimeoutMs(new DescribeConsumerGroupsOptions())
-      ).describedGroups.get(groupId).get
-
-      val state = consumerGroup.state
-      val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
-      var assignedTopicPartitions = ListBuffer[TopicPartition]()
-      val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
-        .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>
-        val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
-        assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
-        val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
-          .map { topicPartition =>
-            topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
-          }.toMap
-
-        collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
-          partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
-          Some(s"${consumerSummary.clientId}"))
-      }
-
-      val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
-        case (topicPartition, offset) =>
-          collectConsumerAssignment(
-            groupId,
-            Option(consumerGroup.coordinator),
-            Seq(topicPartition),
-            Map(topicPartition -> Some(offset.offset)),
-                                  Some(MISSING_COLUMN_VALUE),
-                                  Some(MISSING_COLUMN_VALUE),
-                                  Some(MISSING_COLUMN_VALUE))
-      }
-
-      (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))
+    def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
+      collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None))
     }
 
-    private[admin] def collectGroupMembers(verbose: Boolean): (Option[String], Option[Seq[MemberAssignmentState]]) = {
-      val groupId = opts.options.valueOf(opts.groupOpt)
-      val consumerGroups = adminClient.describeConsumerGroups(
-        List(groupId).asJava,
-        withTimeoutMs(new DescribeConsumerGroupsOptions)
-      ).describedGroups()
-
-      val group = consumerGroups.get(groupId).get
-      val state = group.state
+    /**
+      * Returns states of the specified consumer groups and partition assignment states
+      */
+    def collectGroupsOffsets(groupIds: Seq[String]): TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])] = {
+      val consumerGroups = describeConsumerGroups(groupIds)
+
+      val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
+        val state = consumerGroup.state
+        val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
+        var assignedTopicPartitions = ListBuffer[TopicPartition]()
+        val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
+          .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>
+          val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
+          assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
+          val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
+            .map { topicPartition =>
+              topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
+            }.toMap
+          collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
+            partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
+            Some(s"${consumerSummary.clientId}"))
+        }
+        val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
+          case (topicPartition, offset) =>
+            collectConsumerAssignment(
+              groupId,
+              Option(consumerGroup.coordinator),
+              Seq(topicPartition),
+              Map(topicPartition -> Some(offset.offset)),
+              Some(MISSING_COLUMN_VALUE),
+              Some(MISSING_COLUMN_VALUE),
+              Some(MISSING_COLUMN_VALUE))
+        }
+        groupId -> (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))
+      }).toMap
 
-      (Some(state.toString),
-        Option(group.members().asScala.map {
-          consumer => MemberAssignmentState(groupId, consumer.consumerId, consumer.host, consumer.clientId, consumer.assignment.topicPartitions.size(),
-            if (verbose) consumer.assignment.topicPartitions.asScala.toList else List())
-        }.toList))
+      groupOffsets
     }
 
-    private[admin] def collectGroupState(): GroupState = {
-      val groupId = opts.options.valueOf(opts.groupOpt)
-      val consumerGroups = adminClient.describeConsumerGroups(
-        util.Arrays.asList(groupId),
-        withTimeoutMs(new DescribeConsumerGroupsOptions)
-      ).describedGroups()
+    private[admin] def collectGroupMembers(groupId: String, verbose: Boolean): (Option[String], Option[Seq[MemberAssignmentState]]) = {
+      collectGroupsMembers(Seq(groupId), verbose)(groupId)
+    }
 
-      val group = consumerGroups.get(groupId).get
-      GroupState(groupId, group.coordinator, group.partitionAssignor(),
-        group.state.toString, group.members().size)
+    private[admin] def collectGroupsMembers(groupIds: Seq[String], verbose: Boolean): TreeMap[String, (Option[String], Option[Seq[MemberAssignmentState]])] = {
+      val consumerGroups = describeConsumerGroups(groupIds)
+      TreeMap[String, (Option[String], Option[Seq[MemberAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
+        val state = consumerGroup.state.toString
+        val memberAssignmentStates = consumerGroup.members().asScala.map(consumer =>
+          MemberAssignmentState(
+            groupId,
+            consumer.consumerId,
+            consumer.host,
+            consumer.clientId,
+            consumer.assignment.topicPartitions.size(),
+            if (verbose) consumer.assignment.topicPartitions.asScala.toList else List()
+          )).toList
+        groupId -> (Some(state), Option(memberAssignmentStates))
+      }).toMap
+    }
+
+    private[admin] def collectGroupState(groupId: String): GroupState = {
+      collectGroupsState(Seq(groupId))(groupId)
+    }
+
+    private[admin] def collectGroupsState(groupIds: Seq[String]): TreeMap[String, GroupState] = {
+      val consumerGroups = describeConsumerGroups(groupIds)
+      TreeMap[String, GroupState]() ++ (for ((groupId, groupDescription) <- consumerGroups) yield {
+        groupId -> GroupState(
+          groupId,
+          groupDescription.coordinator,
+          groupDescription.partitionAssignor(),
+          groupDescription.state.toString,
+          groupDescription.members().size
+        )
+      }).toMap
     }
 
-    private def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
-      val offsets = getConsumer.endOffsets(topicPartitions.asJava)
+    private def getLogEndOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
+      val offsets = getConsumer(groupId).endOffsets(topicPartitions.asJava)
       topicPartitions.map { topicPartition =>
         Option(offsets.get(topicPartition)) match {
           case Some(logEndOffset) => topicPartition -> LogOffsetResult.LogOffset(logEndOffset)
@@ -401,8 +497,8 @@ object ConsumerGroupCommand extends Logging {
       }.toMap
     }
 
-    private def getLogStartOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
-      val offsets = getConsumer.beginningOffsets(topicPartitions.asJava)
+    private def getLogStartOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
+      val offsets = getConsumer(groupId).beginningOffsets(topicPartitions.asJava)
       topicPartitions.map { topicPartition =>
         Option(offsets.get(topicPartition)) match {
           case Some(logStartOffset) => topicPartition -> LogOffsetResult.LogOffset(logStartOffset)
@@ -411,8 +507,8 @@ object ConsumerGroupCommand extends Logging {
       }.toMap
     }
 
-    private def getLogTimestampOffsets(topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {
-      val consumer = getConsumer
+    private def getLogTimestampOffsets(groupId: String, topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {
+      val consumer = getConsumer(groupId)
       consumer.assign(topicPartitions.asJava)
 
       val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
@@ -422,12 +518,14 @@ object ConsumerGroupCommand extends Logging {
         case (topicPartition, offsetAndTimestamp) => topicPartition -> LogOffsetResult.LogOffset(offsetAndTimestamp.offset)
       }.toMap
 
-      successfulLogTimestampOffsets ++ getLogEndOffsets(unsuccessfulOffsetsForTimes.keySet.toSeq)
+      successfulLogTimestampOffsets ++ getLogEndOffsets(groupId, unsuccessfulOffsetsForTimes.keySet.toSeq)
     }
 
     def close() {
       adminClient.close()
-      if (consumer != null) consumer.close()
+      consumers.values.foreach(consumer =>
+        Option(consumer).foreach(_.close())
+      )
     }
 
     private def createAdminClient(): admin.AdminClient = {
@@ -436,18 +534,18 @@ object ConsumerGroupCommand extends Logging {
       admin.AdminClient.create(props)
     }
 
-    private def getConsumer = {
-      if (consumer == null)
-        consumer = createConsumer()
-      consumer
+    private def getConsumer(groupId: String) = {
+      if (consumers.get(groupId).isEmpty)
+        consumers.update(groupId, createConsumer(groupId))
+      consumers(groupId)
     }
 
-    private def createConsumer(): KafkaConsumer[String, String] = {
+    private def createConsumer(groupId: String): KafkaConsumer[String, String] = {
       val properties = new Properties()
       val deserializer = (new StringDeserializer).getClass.getName
       val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
       properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
-      properties.put(ConsumerConfig.GROUP_ID_CONFIG, opts.options.valueOf(opts.groupOpt))
+      properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
       properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
       properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
       properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
@@ -467,12 +565,12 @@ object ConsumerGroupCommand extends Logging {
       options.timeoutMs(t)
     }
 
-    private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Seq[TopicPartition] = topicArgs.flatMap {
+    private def parseTopicPartitionsToReset(groupId: String, topicArgs: Seq[String]): Seq[TopicPartition] = topicArgs.flatMap {
       case topicArg if topicArg.contains(":") =>
         val topicPartitions = topicArg.split(":")
         val topic = topicPartitions(0)
         topicPartitions(1).split(",").map(partition => new TopicPartition(topic, partition.toInt))
-      case topic => getConsumer.partitionsFor(topic).asScala
+      case topic => getConsumer(groupId).partitionsFor(topic).asScala
         .map(partitionInfo => new TopicPartition(topic, partitionInfo.partition))
     }
 
@@ -482,7 +580,7 @@ object ConsumerGroupCommand extends Logging {
         allTopicPartitions
       } else if (opts.options.has(opts.topicOpt)) {
         val topics = opts.options.valuesOf(opts.topicOpt).asScala
-        parseTopicPartitionsToReset(topics)
+        parseTopicPartitionsToReset(groupId, topics)
       } else {
         if (opts.options.has(opts.resetFromFileOpt))
           Nil
@@ -498,24 +596,46 @@ object ConsumerGroupCommand extends Logging {
       ).partitionsToOffsetAndMetadata.get
     }
 
-    private def parseResetPlan(resetPlanCsv: String): Map[TopicPartition, OffsetAndMetadata] = {
-      resetPlanCsv.split("\n")
-        .map { line =>
-          val Array(topic, partition, offset) = line.split(",").map(_.trim)
-          val topicPartition = new TopicPartition(topic, partition.toInt)
-          val offsetAndMetadata = new OffsetAndMetadata(offset.toLong)
-          (topicPartition, offsetAndMetadata)
-        }.toMap
+    type GroupMetadata = Map[String, Map[TopicPartition, OffsetAndMetadata]]
+    private def parseResetPlan(resetPlanCsv: String): GroupMetadata = {
+      def updateGroupMetadata(group: String, topic: String, partition: Int, offset: Long, acc: GroupMetadata) = {
+        val topicPartition = new TopicPartition(topic, partition)
+        val offsetAndMetadata = new OffsetAndMetadata(offset)
+        val dataMap = acc.getOrElse(group, Map()).updated(topicPartition, offsetAndMetadata)
+        acc.updated(group, dataMap)
+      }
+      val csvReader = CsvUtils().readerFor[CsvRecordNoGroup]
+      val lines = resetPlanCsv.split("\n")
+      val isSingleGroupQuery = opts.options.valuesOf(opts.groupOpt).size() == 1
+      val isOldCsvFormat = lines.headOption.flatMap(line =>
+        Try(csvReader.readValue[CsvRecordNoGroup](line)).toOption).nonEmpty
+      // Single group CSV format: "topic,partition,offset"
+      val dataMap = if (isSingleGroupQuery && isOldCsvFormat) {
+        val group = opts.options.valueOf(opts.groupOpt)
+        lines.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) =>
+          val CsvRecordNoGroup(topic, partition, offset) = csvReader.readValue[CsvRecordNoGroup](line)
+          updateGroupMetadata(group, topic, partition, offset, acc)
+        }
+        // Multiple group CSV format: "group,topic,partition,offset"
+      } else {
+        val csvReader = CsvUtils().readerFor[CsvRecordWithGroup]
+        lines.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) =>
+          val CsvRecordWithGroup(group, topic, partition, offset) = csvReader.readValue[CsvRecordWithGroup](line)
+          updateGroupMetadata(group, topic, partition, offset, acc)
+        }
+      }
+      dataMap
     }
 
-    private def prepareOffsetsToReset(groupId: String, partitionsToReset: Seq[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = {
+    private def prepareOffsetsToReset(groupId: String,
+                                      partitionsToReset: Seq[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = {
       if (opts.options.has(opts.resetToOffsetOpt)) {
         val offset = opts.options.valueOf(opts.resetToOffsetOpt)
-        checkOffsetsRange(partitionsToReset.map((_, offset)).toMap).map {
+        checkOffsetsRange(groupId, partitionsToReset.map((_, offset)).toMap).map {
           case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
         }
       } else if (opts.options.has(opts.resetToEarliestOpt)) {
-        val logStartOffsets = getLogStartOffsets(partitionsToReset)
+        val logStartOffsets = getLogStartOffsets(groupId, partitionsToReset)
         partitionsToReset.map { topicPartition =>
           logStartOffsets.get(topicPartition) match {
             case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
@@ -523,7 +643,7 @@ object ConsumerGroupCommand extends Logging {
           }
         }.toMap
       } else if (opts.options.has(opts.resetToLatestOpt)) {
-        val logEndOffsets = getLogEndOffsets(partitionsToReset)
+        val logEndOffsets = getLogEndOffsets(groupId, partitionsToReset)
         partitionsToReset.map { topicPartition =>
           logEndOffsets.get(topicPartition) match {
             case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
@@ -538,12 +658,12 @@ object ConsumerGroupCommand extends Logging {
             throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset")).offset
           (topicPartition, currentOffset + shiftBy)
         }.toMap
-        checkOffsetsRange(requestedOffsets).map {
+        checkOffsetsRange(groupId, requestedOffsets).map {
           case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
         }
       } else if (opts.options.has(opts.resetToDatetimeOpt)) {
         val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
-        val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp)
+        val logTimestampOffsets = getLogTimestampOffsets(groupId, partitionsToReset, timestamp)
         partitionsToReset.map { topicPartition =>
           val logTimestampOffset = logTimestampOffsets.get(topicPartition)
           logTimestampOffset match {
@@ -555,8 +675,9 @@ object ConsumerGroupCommand extends Logging {
         val duration = opts.options.valueOf(opts.resetByDurationOpt)
         val durationParsed = Duration.parse(duration)
         val now = Instant.now()
+        durationParsed.negated().addTo(now)
         val timestamp = now.minus(durationParsed).toEpochMilli
-        val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp)
+        val logTimestampOffsets = getLogTimestampOffsets(groupId, partitionsToReset, timestamp)
         partitionsToReset.map { topicPartition =>
           val logTimestampOffset = logTimestampOffsets.get(topicPartition)
           logTimestampOffset match {
@@ -564,16 +685,20 @@ object ConsumerGroupCommand extends Logging {
             case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
           }
         }.toMap
-      } else if (opts.options.has(opts.resetFromFileOpt)) {
-        val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
-        val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
-        val resetPlan = parseResetPlan(resetPlanCsv)
-        val requestedOffsets = resetPlan.keySet.map { topicPartition =>
-          (topicPartition, resetPlan(topicPartition).offset)
-        }.toMap
-        checkOffsetsRange(requestedOffsets).map {
-          case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
-        }
+      } else if (resetPlanFromFile.isDefined) {
+        resetPlanFromFile.map(resetPlan => resetPlan.get(groupId).map { resetPlanForGroup =>
+          val requestedOffsets = resetPlanForGroup.keySet.map { topicPartition =>
+            topicPartition -> resetPlanForGroup(topicPartition).offset
+          }.toMap
+          checkOffsetsRange(groupId, requestedOffsets).map {
+            case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
+          }
+        } match {
+          case Some(resetPlanForGroup) => resetPlanForGroup
+          case None =>
+            printError(s"No reset plan for group $groupId found")
+            Map[TopicPartition, OffsetAndMetadata]()
+        }).getOrElse(Map.empty)
       } else if (opts.options.has(opts.resetToCurrentOpt)) {
         val currentCommittedOffsets = getCommittedOffsets(groupId)
         val (partitionsToResetWithCommittedOffset, partitionsToResetWithoutCommittedOffset) =
@@ -586,7 +711,7 @@ object ConsumerGroupCommand extends Logging {
           }))
         }.toMap
 
-        val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(partitionsToResetWithoutCommittedOffset).map {
+        val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(groupId, partitionsToResetWithoutCommittedOffset).map {
           case (topicPartition, LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
           case (topicPartition, _) => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
         }
@@ -597,9 +722,9 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    private def checkOffsetsRange(requestedOffsets: Map[TopicPartition, Long]) = {
-      val logStartOffsets = getLogStartOffsets(requestedOffsets.keySet.toSeq)
-      val logEndOffsets = getLogEndOffsets(requestedOffsets.keySet.toSeq)
+    private def checkOffsetsRange(groupId: String, requestedOffsets: Map[TopicPartition, Long]) = {
+      val logStartOffsets = getLogStartOffsets(groupId, requestedOffsets.keySet.toSeq)
+      val logEndOffsets = getLogEndOffsets(groupId, requestedOffsets.keySet.toSeq)
       requestedOffsets.map { case (topicPartition, offset) => (topicPartition,
         logEndOffsets.get(topicPartition) match {
           case Some(LogOffsetResult.LogOffset(endOffset)) if offset > endOffset =>
@@ -620,19 +745,33 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = {
-      val rows = assignmentsToReset.map { case (k,v) => s"${k.topic},${k.partition},${v.offset}" }(collection.breakOut): List[String]
-      rows.foldRight("")(_ + "\n" + _)
+    def exportOffsetsToCsv(assignments: Map[String, Map[TopicPartition, OffsetAndMetadata]]): String = {
+      val isSingleGroupQuery = opts.options.valuesOf(opts.groupOpt).size() == 1
+      val csvWriter =
+        if (isSingleGroupQuery) CsvUtils().writerFor[CsvRecordNoGroup]
+        else CsvUtils().writerFor[CsvRecordWithGroup]
+      val rows = assignments.flatMap { case (groupId, partitionInfo) =>
+        partitionInfo.map { case (k: TopicPartition, v: OffsetAndMetadata) =>
+          val csvRecord =
+            if (isSingleGroupQuery) CsvRecordNoGroup(k.topic, k.partition, v.offset)
+            else CsvRecordWithGroup(groupId, k.topic, k.partition, v.offset)
+          csvWriter.writeValueAsString(csvRecord)
+        }(collection.breakOut): List[String]
+      }
+      rows.mkString("")
     }
 
     def deleteGroups(): Map[String, Throwable] = {
-      val groupsToDelete = opts.options.valuesOf(opts.groupOpt).asScala.toList
-      val deletedGroups = adminClient.deleteConsumerGroups(
-        groupsToDelete.asJava,
+      val groupIds =
+        if (opts.options.has(opts.allGroupsOpt)) listGroups()
+        else opts.options.valuesOf(opts.groupOpt).asScala
+
+      val groupsToDelete = adminClient.deleteConsumerGroups(
+        groupIds.asJava,
         withTimeoutMs(new DeleteConsumerGroupsOptions)
       ).deletedGroups().asScala
 
-      val result = deletedGroups.mapValues { f =>
+      val result = groupsToDelete.mapValues { f =>
         Try(f.get) match {
           case _: Success[_] => null
           case Failure(e) => e
@@ -676,6 +815,7 @@ object ConsumerGroupCommand extends Logging {
     val AllTopicsDoc = "Consider all topics assigned to a group in the `reset-offsets` process."
     val ListDoc = "List all consumer groups."
     val DescribeDoc = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group."
+    val AllGroupsDoc = "Apply to all consumer groups."
     val nl = System.getProperty("line.separator")
     val DeleteDoc = "Pass in groups to delete topic partition offsets and ownership information " +
       "over the entire consumer group. For instance --group g1 --group g2"
@@ -725,6 +865,7 @@ object ConsumerGroupCommand extends Logging {
     val allTopicsOpt = parser.accepts("all-topics", AllTopicsDoc)
     val listOpt = parser.accepts("list", ListDoc)
     val describeOpt = parser.accepts("describe", DescribeDoc)
+    val allGroupsOpt = parser.accepts("all-groups", AllGroupsDoc)
     val deleteOpt = parser.accepts("delete", DeleteDoc)
     val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
                              .withRequiredArg
@@ -775,28 +916,32 @@ object ConsumerGroupCommand extends Logging {
 
     options = parser.parse(args : _*)
 
-    val describeOptPresent = options.has(describeOpt)
-
-    val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
+    val allGroupSelectionScopeOpts: Set[OptionSpec[_]] = Set(groupOpt, allGroupsOpt)
+    val allConsumerGroupLevelOpts: Set[OptionSpec[_]]  = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
     val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt,
       resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)
 
     def checkArgs() {
-      // check required args
-      if (options.has(timeoutMsOpt) && !describeOptPresent)
-        debug(s"Option $timeoutMsOpt is applicable only when $describeOpt is used.")
 
       CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
 
-      if (options.has(deleteOpt) && options.has(topicOpt))
+      if (options.has(describeOpt)) {
+        if (!options.has(groupOpt) && !options.has(allGroupsOpt))
+          CommandLineUtils.printUsageAndDie(parser,
+            s"Option $describeOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
+      } else {
+        if (options.has(timeoutMsOpt))
+          debug(s"Option $timeoutMsOpt is applicable only when $describeOpt is used.")
+      }
+
+      if (options.has(deleteOpt)) {
+        if (!options.has(groupOpt) && !options.has(allGroupsOpt))
+          CommandLineUtils.printUsageAndDie(parser,
+            s"Option $deleteOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
+        if (options.has(topicOpt))
           CommandLineUtils.printUsageAndDie(parser, s"The consumer does not support topic-specific offset " +
             "deletion from a consumer group.")
-
-      if (describeOptPresent)
-        CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
-
-      if (options.has(deleteOpt) && !options.has(groupOpt))
-        CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt takes $groupOpt")
+      }
 
       if (options.has(resetOffsetsOpt)) {
         if (options.has(dryRunOpt) && options.has(executeOpt))
@@ -809,18 +954,20 @@ object ConsumerGroupCommand extends Logging {
             "if you are scripting this command and want to keep the current default behavior without prompting.")
         }
 
-        CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, allResetOffsetScenarioOpts - resetToOffsetOpt)
+        if (!options.has(groupOpt) && !options.has(allGroupsOpt))
+          CommandLineUtils.printUsageAndDie(parser,
+            s"Option $resetOffsetsOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt,   allResetOffsetScenarioOpts - resetToOffsetOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, allResetOffsetScenarioOpts - resetToDatetimeOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, allResetOffsetScenarioOpts - resetByDurationOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, allResetOffsetScenarioOpts - resetToEarliestOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, allResetOffsetScenarioOpts - resetToCurrentOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt,   allResetOffsetScenarioOpts - resetToLatestOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt,  allResetOffsetScenarioOpts - resetToCurrentOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt,    allResetOffsetScenarioOpts - resetShiftByOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt,   allResetOffsetScenarioOpts - resetFromFileOpt)
       }
 
-      // check invalid args
+      CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allGroupSelectionScopeOpts - groupOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt)
     }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 336bfb1..9b15389 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1110,7 +1110,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupService = new ConsumerGroupService(opts)
-    consumerGroupService.describeGroup()
+    consumerGroupService.describeGroups()
     consumerGroupService.close()
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index 1bcc316..29c2915 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -42,7 +42,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     val output = TestUtils.grabConsoleOutput(service.deleteGroups())
     assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group",
-        output.contains(s"Group '$missingGroup' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message))
+      output.contains(s"Group '$missingGroup' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message))
   }
 
   @Test
@@ -56,7 +56,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     val result = service.deleteGroups()
     assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group",
-      result.size == 1 && result.keySet.contains(missingGroup) && result.get(missingGroup).get.getCause
+      result.size == 1 && result.keySet.contains(missingGroup) && result(missingGroup).getCause
         .isInstanceOf[GroupIdNotFoundException])
   }
 
@@ -70,7 +70,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      service.collectGroupMembers(false)._2.get.size == 1
+      service.collectGroupMembers(group, false)._2.get.size == 1
     }, "The group did not initialize as expected.", maxRetries = 3)
 
     val output = TestUtils.grabConsoleOutput(service.deleteGroups())
@@ -88,13 +88,13 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      service.collectGroupMembers(false)._2.get.size == 1
+      service.collectGroupMembers(group, false)._2.get.size == 1
     }, "The group did not initialize as expected.", maxRetries = 3)
 
     val result = service.deleteGroups()
-    assertNotNull(s"Group was deleted successfully, but it shouldn't have been. Result was:(${result})", result.get(group).get)
+    assertNotNull(s"Group was deleted successfully, but it shouldn't have been. Result was:(${result})", result(group))
     assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group. Result was:(${result})",
-      result.size == 1 && result.keySet.contains(group) && result.get(group).get.getCause.isInstanceOf[GroupNotEmptyException])
+      result.size == 1 && result.keySet.contains(group) && result(group).getCause.isInstanceOf[GroupNotEmptyException])
   }
 
   @Test
@@ -113,7 +113,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      service.collectGroupState().state == "Empty"
+      service.collectGroupState(group).state == "Empty"
     }, "The group did become empty as expected.", maxRetries = 3)
 
     val output = TestUtils.grabConsoleOutput(service.deleteGroups())
@@ -122,6 +122,43 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
   }
 
   @Test
+  def testDeleteCmdAllGroups() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+
+    // Create 3 groups with 1 consumer per each
+    val groups =
+      (for (i <- 1 to 3) yield {
+        val group = this.group + i
+        val executor = addConsumerGroupExecutor(numConsumers = 1, group = group)
+        group -> executor
+      }).toMap
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--all-groups")
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      service.listGroups().forall(groupId => groups.keySet.contains(groupId))
+    }, "The group did not initialize as expected.", maxRetries = 3)
+
+    // Shutdown consumers to empty out groups
+    groups.values.foreach(executor => executor.shutdown())
+
+    TestUtils.waitUntilTrue(() => {
+      groups.keySet.forall(groupId => service.collectGroupState(groupId).state == "Empty")
+    }, "The group did become empty as expected.", maxRetries = 3)
+
+    val output = TestUtils.grabConsoleOutput(service.deleteGroups()).trim
+    val expectedGroupsForDeletion = groups.keySet
+    val deletedGroupsGrepped = output.substring(output.indexOf('(') + 1, output.indexOf(')')).split(',')
+      .map(_.replaceAll("'", "").trim).toSet
+
+    assertTrue(s"The consumer group(s) could not be deleted as expected",
+      output.matches(s"Deletion of requested consumer groups (.*) was successful.")
+        && deletedGroupsGrepped == expectedGroupsForDeletion
+    )
+  }
+
+  @Test
   def testDeleteEmptyGroup() {
     TestUtils.createOffsetsTopic(zkClient, servers)
 
@@ -137,12 +174,12 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      service.collectGroupState().state == "Empty"
+      service.collectGroupState(group).state == "Empty"
     }, "The group did become empty as expected.", maxRetries = 3)
 
     val result = service.deleteGroups()
     assertTrue(s"The consumer group could not be deleted as expected",
-      result.size == 1 && result.keySet.contains(group) && result.get(group).get == null)
+      result.size == 1 && result.keySet.contains(group) && result(group) == null)
   }
 
   @Test
@@ -162,14 +199,14 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      service.collectGroupState().state == "Empty"
+      service.collectGroupState(group).state == "Empty"
     }, "The group did become empty as expected.", maxRetries = 3)
 
     val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
     val output = TestUtils.grabConsoleOutput(service2.deleteGroups())
     assertTrue(s"The consumer group deletion did not work as expected",
       output.contains(s"Group '$missingGroup' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message) &&
-      output.contains(s"These consumer groups were deleted successfully: '$group'"))
+        output.contains(s"These consumer groups were deleted successfully: '$group'"))
   }
 
   @Test
@@ -189,15 +226,16 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      service.collectGroupState().state == "Empty"
+      service.collectGroupState(group).state == "Empty"
     }, "The group did become empty as expected.", maxRetries = 3)
 
     val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
     val result = service2.deleteGroups()
     assertTrue(s"The consumer group deletion did not work as expected",
       result.size == 2 &&
-        result.keySet.contains(group) && result.get(group).get == null &&
-        result.keySet.contains(missingGroup) && result.get(missingGroup).get.getMessage.contains(Errors.GROUP_ID_NOT_FOUND.message))
+        result.keySet.contains(group) && result(group) == null &&
+        result.keySet.contains(missingGroup) &&
+        result(missingGroup).getMessage.contains(Errors.GROUP_ID_NOT_FOUND.message))
   }
 
 
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 88f9f4a..dcad72a 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -22,7 +22,7 @@ import joptsimple.OptionException
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{TimeoutException}
+import org.apache.kafka.common.errors.TimeoutException
 import org.junit.Assert._
 import org.junit.Test
 
@@ -46,7 +46,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", missingGroup) ++ describeType
       val service = getConsumerGroupService(cgcArgs)
 
-      val output = TestUtils.grabConsoleOutput(service.describeGroup())
+      val output = TestUtils.grabConsoleOutput(service.describeGroups())
       assertTrue(s"Expected error was not detected for describe option '${describeType.mkString(" ")}'",
           output.contains(s"Consumer group '$missingGroup' does not exist."))
     }
@@ -61,49 +61,52 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
 
   @Test
   def testDescribeOffsetsOfNonExistingGroup() {
+    val group = "missing.group"
     TestUtils.createOffsetsTopic(zkClient, servers)
 
     // run one consumer in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 1)
     // note the group to be queried is a different (non-existing) group
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
     val service = getConsumerGroupService(cgcArgs)
 
-    val (state, assignments) = service.collectGroupOffsets()
+    val (state, assignments) = service.collectGroupOffsets(group)
     assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group'.",
         state.contains("Dead") && assignments.contains(List()))
   }
 
   @Test
   def testDescribeMembersOfNonExistingGroup() {
+    val group = "missing.group"
     TestUtils.createOffsetsTopic(zkClient, servers)
 
     // run one consumer in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 1)
     // note the group to be queried is a different (non-existing) group
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
     val service = getConsumerGroupService(cgcArgs)
 
-    val (state, assignments) = service.collectGroupMembers(false)
+    val (state, assignments) = service.collectGroupMembers(group, false)
     assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group'.",
         state.contains("Dead") && assignments.contains(List()))
 
-    val (state2, assignments2) = service.collectGroupMembers(true)
+    val (state2, assignments2) = service.collectGroupMembers(group, true)
     assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group' (verbose option).",
         state2.contains("Dead") && assignments2.contains(List()))
   }
 
   @Test
   def testDescribeStateOfNonExistingGroup() {
+    val group = "missing.group"
     TestUtils.createOffsetsTopic(zkClient, servers)
 
     // run one consumer in the group consuming from a single-partition topic
     addConsumerGroupExecutor(numConsumers = 1)
     // note the group to be queried is a different (non-existing) group
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
     val service = getConsumerGroupService(cgcArgs)
 
-    val state = service.collectGroupState()
+    val state = service.collectGroupState(group)
     assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group'.",
         state.state == "Dead" && state.numMembers == 0 &&
         state.coordinator != null && servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
@@ -122,13 +125,62 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       val service = getConsumerGroupService(cgcArgs)
 
       TestUtils.waitUntilTrue(() => {
-        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
         output.trim.split("\n").length == 2 && error.isEmpty
       }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.", maxRetries = 3)
     }
   }
 
   @Test
+  def testDescribeExistingGroups() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+
+    // Create N single-threaded consumer groups from a single-partition topic
+    val groups = (for (describeType <- describeTypes) yield {
+      val group = this.group + describeType.mkString("")
+      addConsumerGroupExecutor(numConsumers = 1, group = group)
+      Array("--group", group)
+    }).flatten
+
+    val expectedNumLines = describeTypes.length * 2
+
+    for (describeType <- describeTypes) {
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe") ++ groups ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      TestUtils.waitUntilTrue(() => {
+        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
+        val numLines = output.trim.split("\n").filterNot(line => line.isEmpty).length
+        (numLines == expectedNumLines) && error.isEmpty
+      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.", maxRetries = 3)
+    }
+  }
+
+  @Test
+  def testDescribeAllExistingGroups() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+
+    // Create N single-threaded consumer groups from a single-partition topic
+    for (describeType <- describeTypes) {
+      val group = this.group + describeType.mkString("")
+      addConsumerGroupExecutor(numConsumers = 1, group = group)
+    }
+
+    val expectedNumLines = describeTypes.length * 2
+
+    for (describeType <- describeTypes) {
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--all-groups") ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      TestUtils.waitUntilTrue(() => {
+        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
+        val numLines = output.trim.split("\n").filterNot(line => line.isEmpty).length
+        (numLines == expectedNumLines) && error.isEmpty
+      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.", maxRetries = 3)
+    }
+  }
+
+  @Test
   def testDescribeOffsetsOfExistingGroup() {
     TestUtils.createOffsetsTopic(zkClient, servers)
 
@@ -139,7 +191,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupOffsets()
+      val (state, assignments) = service.collectGroupOffsets(group)
       state.contains("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 1 &&
@@ -159,7 +211,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupMembers(false)
+      val (state, assignments) = service.collectGroupMembers(group, false)
       state.contains("Stable") &&
         (assignments match {
           case Some(memberAssignments) =>
@@ -172,7 +224,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         })
     }, s"Expected a 'Stable' group status, rows and valid member information for group $group.", maxRetries = 3)
 
-    val (_, assignments) = service.collectGroupMembers(true)
+    val (_, assignments) = service.collectGroupMembers(group, true)
     assignments match {
       case None =>
         fail(s"Expected partition assignments for members of group $group")
@@ -193,7 +245,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val state = service.collectGroupState()
+      val state = service.collectGroupState(group)
       state.state == "Stable" &&
         state.numMembers == 1 &&
         state.assignmentStrategy == "range" &&
@@ -212,7 +264,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val state = service.collectGroupState()
+      val state = service.collectGroupState(group)
       state.state == "Stable" &&
         state.numMembers == 1 &&
         state.assignmentStrategy == "roundrobin" &&
@@ -233,14 +285,14 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       val service = getConsumerGroupService(cgcArgs)
 
       TestUtils.waitUntilTrue(() => {
-        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
         output.trim.split("\n").length == 2 && error.isEmpty
       }, s"Expected describe group results with one data row for describe type '${describeType.mkString(" ")}'", maxRetries = 3)
 
       // stop the consumer so the group has no active member anymore
       executor.shutdown()
       TestUtils.waitUntilTrue(() => {
-        TestUtils.grabConsoleError(service.describeGroup()).contains(s"Consumer group '$group' has no active members.")
+        TestUtils.grabConsoleError(service.describeGroups()).contains(s"Consumer group '$group' has no active members.")
       }, s"Expected no active member in describe group results with describe type ${describeType.mkString(" ")}", maxRetries = 3)
     }
   }
@@ -256,14 +308,14 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupOffsets()
+      val (state, assignments) = service.collectGroupOffsets(group)
       state.contains("Stable") && assignments.exists(_.exists(_.group == group))
     }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.", maxRetries = 3)
 
     // stop the consumer so the group has no active member anymore
     executor.shutdown()
 
-    val (result, succeeded) = TestUtils.computeUntilTrue(service.collectGroupOffsets()) {
+    val (result, succeeded) = TestUtils.computeUntilTrue(service.collectGroupOffsets(group)) {
       case (state, assignments) =>
         val testGroupAssignments = assignments.toSeq.flatMap(_.filter(_.group == group))
         def assignment = testGroupAssignments.head
@@ -289,7 +341,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupMembers(false)
+      val (state, assignments) = service.collectGroupMembers(group, false)
       state.contains("Stable") && assignments.exists(_.exists(_.group == group))
     }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.", maxRetries = 3)
 
@@ -297,7 +349,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupMembers(false)
+      val (state, assignments) = service.collectGroupMembers(group, false)
       state.contains("Empty") && assignments.isDefined && assignments.get.isEmpty
     }, s"Expected no member in describe group members results for group '$group'", maxRetries = 3)
   }
@@ -313,7 +365,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val state = service.collectGroupState()
+      val state = service.collectGroupState(group)
       state.state == "Stable" &&
         state.numMembers == 1 &&
         state.coordinator != null &&
@@ -324,7 +376,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      val state = service.collectGroupState()
+      val state = service.collectGroupState(group)
       state.state == "Empty" && state.numMembers == 0 && state.assignmentStrategy == ""
     }, s"Expected the group '$group' to become empty after the only member leaving.", maxRetries = 3)
   }
@@ -341,7 +393,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       val service = getConsumerGroupService(cgcArgs)
 
       TestUtils.waitUntilTrue(() => {
-        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
         val expectedNumRows = if (describeTypeMembers.contains(describeType)) 3 else 2
         error.isEmpty && output.trim.split("\n").size == expectedNumRows
       }, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'", maxRetries = 3)
@@ -359,7 +411,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupOffsets()
+      val (state, assignments) = service.collectGroupOffsets(group)
       state.contains("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 1 &&
@@ -378,7 +430,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupMembers(false)
+      val (state, assignments) = service.collectGroupMembers(group, false)
       state.contains("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 2 &&
@@ -387,7 +439,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.get.count(_.assignment.nonEmpty) == 0
     }, "Expected rows for consumers with no assigned partitions in describe group results", maxRetries = 3)
 
-    val (state, assignments) = service.collectGroupMembers(true)
+    val (state, assignments) = service.collectGroupMembers(group, true)
     assertTrue("Expected additional columns in verbose version of describe members",
         state.contains("Stable") && assignments.get.count(_.assignment.nonEmpty) > 0)
   }
@@ -403,7 +455,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val state = service.collectGroupState()
+      val state = service.collectGroupState(group)
       state.state == "Stable" && state.numMembers == 2
     }, "Expected two consumers in describe group results", maxRetries = 3)
   }
@@ -422,7 +474,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       val service = getConsumerGroupService(cgcArgs)
 
       TestUtils.waitUntilTrue(() => {
-        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
         val expectedNumRows = if (describeTypeState.contains(describeType)) 2 else 3
         error.isEmpty && output.trim.split("\n").size == expectedNumRows
       }, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'", maxRetries = 3)
@@ -442,7 +494,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupOffsets()
+      val (state, assignments) = service.collectGroupOffsets(group)
       state.contains("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 2 &&
@@ -464,7 +516,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupMembers(false)
+      val (state, assignments) = service.collectGroupMembers(group, false)
       state.contains("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 2 &&
@@ -472,7 +524,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.get.count{ x => x.group == group && x.numPartitions == 0 } == 0
     }, "Expected two rows (one row per consumer) in describe group members results.", maxRetries = 3)
 
-    val (state, assignments) = service.collectGroupMembers(true)
+    val (state, assignments) = service.collectGroupMembers(group, true)
     assertTrue("Expected additional columns in verbose version of describe members",
         state.contains("Stable") && assignments.get.count(_.assignment.isEmpty) == 0)
   }
@@ -490,7 +542,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val state = service.collectGroupState()
+      val state = service.collectGroupState(group)
       state.state == "Stable" && state.group == group && state.numMembers == 2
     }, "Expected a stable group with two members in describe group state result.", maxRetries = 3)
   }
@@ -508,7 +560,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupOffsets()
+      val (state, assignments) = service.collectGroupOffsets(group)
       state.contains("Empty") && assignments.isDefined && assignments.get.count(_.group == group) == 2
     }, "Expected a stable group with two members in describe group state result.", maxRetries = 3)
   }
@@ -527,7 +579,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     try {
-      TestUtils.grabConsoleOutputAndError(service.describeGroup())
+      TestUtils.grabConsoleOutputAndError(service.describeGroups())
       fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})")
     } catch {
       case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException]) // OK
@@ -547,7 +599,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     try {
-      service.collectGroupOffsets()
+      service.collectGroupOffsets(group)
       fail("The consumer group command should fail due to low initialization timeout")
     } catch {
       case e : ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException]) // OK
@@ -567,12 +619,12 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     try {
-      service.collectGroupMembers(false)
+      service.collectGroupMembers(group, false)
       fail("The consumer group command should fail due to low initialization timeout")
     } catch {
       case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException])// OK
         try {
-          service.collectGroupMembers(true)
+          service.collectGroupMembers(group, true)
           fail("The consumer group command should fail due to low initialization timeout (verbose)")
         } catch {
           case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException]) // OK
@@ -593,7 +645,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     try {
-      service.collectGroupState()
+      service.collectGroupState(group)
       fail("The consumer group command should fail due to low initialization timeout")
     } catch {
       case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException]) // OK
@@ -621,7 +673,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = service.collectGroupOffsets()
+      val (state, assignments) = service.collectGroupOffsets(group)
       state.contains("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 1 &&
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 007edd9..cf5e36f 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -71,38 +71,93 @@ class TimeConversionTests {
   * - export/import
   */
 class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-  
+
   val overridingProps = new Properties()
   val topic1 = "foo1"
   val topic2 = "foo2"
 
   override def generateConfigs: Seq[KafkaConfig] = {
     TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false)
-      .map(KafkaConfig.fromProps(_, overridingProps))  
-  } 
+      .map(KafkaConfig.fromProps(_, overridingProps))
+  }
 
   @Test
   def testResetOffsetsNotExistingGroup() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics",
+    val group = "missing.group"
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--to-current", "--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
     // Make sure we got a coordinator
     TestUtils.waitUntilTrue(() => {
-      consumerGroupCommand.collectGroupState().coordinator.host() == "localhost"
-    }, "Can't find a coordinator.", maxRetries = 3)
-    val resetOffsets = consumerGroupCommand.resetOffsets()
+      consumerGroupCommand.collectGroupState(group).coordinator.host() == "localhost"
+    }, "Can't find a coordinator")
+    val resetOffsets = consumerGroupCommand.resetOffsets()(group)
     assertEquals(Map.empty, resetOffsets)
-    assertEquals(resetOffsets, committedOffsets(group = "missing.group"))
+    assertEquals(resetOffsets, committedOffsets(group = group))
   }
 
   @Test
   def testResetOffsetsExistingTopic(): Unit = {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic,
+    val group = "new.group"
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic,
+      "--to-offset", "50")
+    produceMessages(topic, 100)
+    resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
+    resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true)
+    resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50)
+  }
+
+  @Test
+  def testResetOffsetsExistingTopicSelectedGroups(): Unit = {
+    produceMessages(topic, 100)
+    val groups = (
+      for (id <- 1 to 3) yield {
+        val group = this.group + id
+        val executor = addConsumerGroupExecutor(numConsumers = 1, topic = topic, group = group)
+        awaitConsumerProgress(count = 100L, group = group)
+        executor.shutdown()
+        Array("--group", group)
+      }).toArray.flatten
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--topic", topic,
+      "--to-offset", "50") ++ groups
+    resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
+    resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true)
+    resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50)
+  }
+
+  @Test
+  def testResetOffsetsExistingTopicAllGroups(): Unit = {
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--all-groups", "--topic", topic,
       "--to-offset", "50")
     produceMessages(topic, 100)
+    for (group <- 1 to 3 map (group + _)) {
+      val executor = addConsumerGroupExecutor(numConsumers = 1, topic = topic, group = group)
+      awaitConsumerProgress(count = 100L, group = group)
+      executor.shutdown()
+    }
     resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
     resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true)
-    resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50, group = "new.group")
+    resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50)
+  }
+
+  @Test
+  def testResetOffsetsAllTopicsAllGroups(): Unit = {
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--all-groups", "--all-topics",
+      "--to-offset", "50")
+    val topics = 1 to 3 map (topic + _)
+    val groups = 1 to 3 map (group + _)
+    topics foreach (topic => produceMessages(topic, 100))
+    for {
+      topic <- topics
+      group <- groups
+    } {
+      val executor = addConsumerGroupExecutor(numConsumers = 3, topic = topic, group = group)
+      awaitConsumerProgress(topic = topic, count = 100L, group = group)
+      executor.shutdown()
+    }
+    resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true, topics = topics)
+    resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true, topics = topics)
+    resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50, topics = topics)
   }
 
   @Test
@@ -143,7 +198,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsByDuration() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--by-duration", "PT1M", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
@@ -151,7 +206,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsByDurationToEarliest() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--by-duration", "PT0.1S", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 100)
   }
 
@@ -159,7 +214,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToEarliest() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--to-earliest", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
@@ -167,7 +222,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToLatest() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--to-latest", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 200)
   }
@@ -176,7 +231,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToCurrentOffset() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--to-current", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 100)
   }
@@ -185,7 +240,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToSpecificOffset() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--to-offset", "1", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 1)
   }
 
@@ -193,7 +248,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsShiftPlus() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--shift-by", "50", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 150)
   }
@@ -202,7 +257,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsShiftMinus() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--shift-by", "-50", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 50)
   }
@@ -211,7 +266,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsShiftByLowerThanEarliest() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--shift-by", "-150", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
@@ -220,7 +275,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsShiftByHigherThanLatest() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--shift-by", "150", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 200)
   }
@@ -229,7 +284,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToEarliestOnOneTopic() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic,
       "--to-earliest", "--execute")
-    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
@@ -242,7 +297,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
       s"$topic:1", "--to-earliest", "--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
 
-    produceConsumeAndShutdown(topic, totalMessages = 100, numConsumers = 2)
+    produceConsumeAndShutdown(topic, group, totalMessages = 100, numConsumers = 2)
     val priorCommittedOffsets = committedOffsets(topic = topic)
 
     val tp0 = new TopicPartition(topic, 0)
@@ -264,13 +319,13 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
       "--topic", topic2, "--to-earliest", "--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
 
-    produceConsumeAndShutdown(topic1, 100, 1)
-    produceConsumeAndShutdown(topic2, 100, 1)
+    produceConsumeAndShutdown(topic1, group, 100, 1)
+    produceConsumeAndShutdown(topic2, group, 100, 1)
 
     val tp1 = new TopicPartition(topic1, 0)
     val tp2 = new TopicPartition(topic2, 0)
 
-    val allResetOffsets = resetOffsets(consumerGroupCommand)
+    val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset())
     assertEquals(Map(tp1 -> 0L, tp2 -> 0L), allResetOffsets)
     assertEquals(Map(tp1 -> 0L), committedOffsets(topic1))
     assertEquals(Map(tp2 -> 0L), committedOffsets(topic2))
@@ -291,15 +346,15 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
       s"$topic1:1", "--topic", s"$topic2:1", "--to-earliest", "--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
 
-    produceConsumeAndShutdown(topic1, 100, 2)
-    produceConsumeAndShutdown(topic2, 100, 2)
+    produceConsumeAndShutdown(topic1, group, 100, 2)
+    produceConsumeAndShutdown(topic2, group, 100, 2)
 
     val priorCommittedOffsets1 = committedOffsets(topic1)
     val priorCommittedOffsets2 = committedOffsets(topic2)
 
     val tp1 = new TopicPartition(topic1, 1)
     val tp2 = new TopicPartition(topic2, 1)
-    val allResetOffsets = resetOffsets(consumerGroupCommand)
+    val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset())
     assertEquals(Map(tp1 -> 0, tp2 -> 0), allResetOffsets)
 
     assertEquals(priorCommittedOffsets1 + (tp1 -> 0L), committedOffsets(topic1))
@@ -310,7 +365,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   }
 
   @Test
-  def testResetOffsetsExportImportPlan() {
+  // This one deals with old CSV export/import format for a single --group arg: "topic,partition,offset" to support old behavior
+  def testResetOffsetsExportImportPlanSingleGroupArg() {
     val topic = "bar"
     val tp0 = new TopicPartition(topic, 0)
     val tp1 = new TopicPartition(topic, 1)
@@ -320,22 +376,72 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
       "--to-offset", "2", "--export")
     val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
-    produceConsumeAndShutdown(topic, 100, 2)
+    produceConsumeAndShutdown(topic = topic, group = group, totalMessages = 100, numConsumers = 2)
 
     val file = File.createTempFile("reset", ".csv")
     file.deleteOnExit()
 
     val exportedOffsets = consumerGroupCommand.resetOffsets()
     val bw = new BufferedWriter(new FileWriter(file))
-    bw.write(consumerGroupCommand.exportOffsetsToReset(exportedOffsets))
+    bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets))
     bw.close()
-    assertEquals(Map(tp0 -> 2L, tp1 -> 2L), exportedOffsets.mapValues(_.offset))
+    assertEquals(Map(tp0 -> 2L, tp1 -> 2L), exportedOffsets(group).mapValues(_.offset))
 
     val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--from-file", file.getCanonicalPath, "--dry-run")
     val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
     val importedOffsets = consumerGroupCommandExec.resetOffsets()
-    assertEquals(Map(tp0 -> 2L, tp1 -> 2L), importedOffsets.mapValues(_.offset))
+    assertEquals(Map(tp0 -> 2L, tp1 -> 2L), importedOffsets(group).mapValues(_.offset))
+
+    adminZkClient.deleteTopic(topic)
+  }
+
+  @Test
+  // This one deals with universal CSV export/import file format "group,topic,partition,offset",
+  // supporting multiple --group args or --all-groups arg
+  def testResetOffsetsExportImportPlan() {
+    val group1 = group + "1"
+    val group2 = group + "2"
+    val topic1 = "bar1"
+    val topic2 = "bar2"
+    val t1p0 = new TopicPartition(topic1, 0)
+    val t1p1 = new TopicPartition(topic1, 1)
+    val t2p0 = new TopicPartition(topic2, 0)
+    val t2p1 = new TopicPartition(topic2, 1)
+    createTopic(topic1, 2, 1)
+    createTopic(topic2, 2, 1)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group1, "--group", group2, "--all-topics",
+      "--to-offset", "2", "--export")
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
+
+    produceConsumeAndShutdown(topic = topic1, group = group1, totalMessages = 100, numConsumers = 2)
+    produceConsumeAndShutdown(topic = topic2, group = group2, totalMessages = 100, numConsumers = 5)
+
+    val file = File.createTempFile("reset", ".csv")
+    file.deleteOnExit()
+
+    val exportedOffsets = consumerGroupCommand.resetOffsets()
+    val bw = new BufferedWriter(new FileWriter(file))
+    bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets))
+    bw.close()
+    assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), exportedOffsets(group1).mapValues(_.offset))
+    assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), exportedOffsets(group2).mapValues(_.offset))
+
+    // Multiple --group's offset import
+    val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group1, "--group", group2, "--all-topics",
+      "--from-file", file.getCanonicalPath, "--dry-run")
+    val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
+    val importedOffsets = consumerGroupCommandExec.resetOffsets()
+    assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets(group1).mapValues(_.offset))
+    assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), importedOffsets(group2).mapValues(_.offset))
+
+    // Single --group offset import using "group,topic,partition,offset" csv format
+    val cgcArgsExec2 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group1, "--all-topics",
+      "--from-file", file.getCanonicalPath, "--dry-run")
+    val consumerGroupCommandExec2 = getConsumerGroupService(cgcArgsExec2)
+    val importedOffsets2 = consumerGroupCommandExec2.resetOffsets()
+    assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets2(group1).mapValues(_.offset))
 
     adminZkClient.deleteTopic(topic)
   }
@@ -353,30 +459,38 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     TestUtils.produceMessages(servers, records, acks = 1)
   }
 
-  private def produceConsumeAndShutdown(topic: String, totalMessages: Int, numConsumers: Int = 1) {
+  private def produceConsumeAndShutdown(topic: String, group: String = group, totalMessages: Int, numConsumers: Int = 1) {
     produceMessages(topic, totalMessages)
-    val executor =  addConsumerGroupExecutor(numConsumers, topic)
-    awaitConsumerProgress(topic, totalMessages)
+    val executor = addConsumerGroupExecutor(numConsumers = numConsumers, topic = topic, group = group)
+    awaitConsumerProgress(topic, group, totalMessages)
     executor.shutdown()
   }
 
-  private def awaitConsumerProgress(topic: String = topic, count: Long): Unit = {
+  private def awaitConsumerProgress(topic: String = topic, group: String = group, count: Long): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val offsets = committedOffsets(topic).values
+      val offsets = committedOffsets(topic = topic, group = group).values
       count == offsets.sum
-    }, "Expected that consumer group has consumed all messages from topic/partition.")
+    }, "Expected that consumer group has consumed all messages from topic/partition. " +
+      s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}")
   }
 
   private def resetAndAssertOffsets(args: Array[String],
-                                           expectedOffset: Long,
-                                           group: String = group,
-                                           dryRun: Boolean = false): Unit = {
+                                    expectedOffset: Long,
+                                    dryRun: Boolean = false,
+                                    topics: Seq[String] = Seq(topic)): Unit = {
     val consumerGroupCommand = getConsumerGroupService(args)
+    val expectedOffsets = topics.map(topic => topic -> Map(new TopicPartition(topic, 0) -> expectedOffset)).toMap
+    val resetOffsetsResultByGroup = resetOffsets(consumerGroupCommand)
+
     try {
-      val priorOffsets = committedOffsets(group = group)
-      val expectedOffsets = Map(new TopicPartition(topic, 0) -> expectedOffset)
-      assertEquals(expectedOffsets, resetOffsets(consumerGroupCommand))
-      assertEquals(if (dryRun) priorOffsets else expectedOffsets, committedOffsets(group = group))
+      for {
+        topic <- topics
+        (group, partitionInfo) <- resetOffsetsResultByGroup
+      } {
+        val priorOffsets = committedOffsets(topic = topic, group = group)
+        assertEquals(expectedOffsets(topic), partitionInfo.filter(partitionInfo => partitionInfo._1.topic() == topic).mapValues(_.offset))
+        assertEquals(if (dryRun) priorOffsets else expectedOffsets(topic), committedOffsets(topic = topic, group = group))
+      }
     } finally {
       consumerGroupCommand.close()
     }
@@ -386,14 +500,16 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
                                              expectedOffsets: Map[TopicPartition, Long],
                                              topic: String): Unit = {
     val allResetOffsets = resetOffsets(consumerGroupService)
-    allResetOffsets.foreach { case (tp, offset) =>
-      assertEquals(offset, expectedOffsets(tp))
+    for {
+      (group, offsetsInfo) <- allResetOffsets
+      (tp, offsetMetadata) <- offsetsInfo
+    } {
+      assertEquals(offsetMetadata.offset(), expectedOffsets(tp))
+      assertEquals(expectedOffsets, committedOffsets(topic, group))
     }
-    assertEquals(expectedOffsets, committedOffsets(topic))
   }
 
-  private def resetOffsets(consumerGroupService: ConsumerGroupService): Map[TopicPartition, Long] = {
-    consumerGroupService.resetOffsets().mapValues(_.offset)
+  private def resetOffsets(consumerGroupService: ConsumerGroupService) = {
+    consumerGroupService.resetOffsets()
   }
-
 }
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 9d8af44..e9d1f69 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -105,6 +105,8 @@ libs += [
   bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
   easymock: "org.easymock:easymock:$versions.easymock",
   jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
+  jacksonDataformatCsv: "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:$versions.jackson",
+  jacksonModuleScala: "com.fasterxml.jackson.module:jackson-module-scala_$versions.baseScala:$versions.jackson",
   jacksonJDK8Datatypes: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson",
   jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
   jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb",


Mime
View raw message