kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7141; Consumer group describe should include groups with no committed offsets (#5356)
Date Fri, 20 Jul 2018 15:52:10 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf237fa  KAFKA-7141; Consumer group describe should include groups with no committed
offsets (#5356)
bf237fa is described below

commit bf237fa7c576bd141d78fdea9f17f65ea269c290
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Fri Jul 20 23:52:04 2018 +0800

    KAFKA-7141; Consumer group describe should include groups with no committed offsets (#5356)
    
    Currently, if a consumer group never commits offsets, ConsumerGroupCommand will not include
it in the describe output even if the member assignment is valid. Instead, the tool should
be able to describe the group information showing empty current_offset and LAG.
    
    Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>, Vahid Hashemian <vahidhashemian@us.ibm.com>,
Jason Gustafson <jason@confluent.io>
---
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 21 +++++++---------
 .../kafka/admin/ConsumerGroupCommandTest.scala     | 17 +++++++------
 .../kafka/admin/DescribeConsumerGroupTest.scala    | 28 +++++++++++++++++++++-
 3 files changed, 46 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 48c2cff..1d61720 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -27,7 +27,6 @@ import kafka.utils._
 import org.apache.kafka.clients.{CommonClientConfigs, admin}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
-import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
@@ -35,7 +34,6 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, Set}
-import scala.concurrent.ExecutionException
 import scala.util.{Failure, Success, Try}
 
 object ConsumerGroupCommand extends Logging {
@@ -340,20 +338,19 @@ object ConsumerGroupCommand extends Logging {
       val state = consumerGroup.state
       val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
       var assignedTopicPartitions = ListBuffer[TopicPartition]()
-      val rowsWithConsumer = if (committedOffsets.isEmpty) List[PartitionAssignmentState]()
else consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
-        .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size)
-        .flatMap { consumerSummary =>
-          val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
-          assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
-          val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
-            .map { topicPartition =>
-              topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
-            }.toMap
+      val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
+        .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap
{ consumerSummary =>
+        val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
+        assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
+        val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
+          .map { topicPartition =>
+            topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
+          }.toMap
 
         collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
           partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
           Some(s"${consumerSummary.clientId}"))
-        }
+      }
 
       val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap
{
         case (topicPartition, offset) =>
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 072f29a..d5eea98 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -92,8 +92,9 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
   def addConsumerGroupExecutor(numConsumers: Int,
                                topic: String = topic,
                                group: String = group,
-                               strategy: String = classOf[RangeAssignor].getName): ConsumerGroupExecutor
= {
-    val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy)
+                               strategy: String = classOf[RangeAssignor].getName,
+                               customPropsOpt: Option[Properties] = None): ConsumerGroupExecutor
= {
+    val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy,
customPropsOpt)
     addExecutor(executor)
     executor
   }
@@ -114,9 +115,10 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
 
 object ConsumerGroupCommandTest {
 
-  abstract class AbstractConsumerRunnable(broker: String, groupId: String) extends Runnable
{
+  abstract class AbstractConsumerRunnable(broker: String, groupId: String, customPropsOpt:
Option[Properties] = None) extends Runnable {
     val props = new Properties
     configure(props)
+    customPropsOpt.foreach(props.asScala ++= _.asScala)
     val consumer = new KafkaConsumer(props)
 
     def configure(props: Properties): Unit = {
@@ -145,8 +147,8 @@ object ConsumerGroupCommandTest {
     }
   }
 
-  class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String)
-    extends AbstractConsumerRunnable(broker, groupId) {
+  class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String,
customPropsOpt: Option[Properties] = None)
+    extends AbstractConsumerRunnable(broker, groupId, customPropsOpt) {
 
     override def configure(props: Properties): Unit = {
       super.configure(props)
@@ -182,11 +184,12 @@ object ConsumerGroupCommandTest {
     }
   }
 
-  class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic:
String, strategy: String)
+  class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic:
String, strategy: String,
+                              customPropsOpt: Option[Properties] = None)
     extends AbstractConsumerGroupExecutor(numConsumers) {
 
     for (_ <- 1 to numConsumers) {
-      submit(new ConsumerRunnable(broker, groupId, topic, strategy))
+      submit(new ConsumerRunnable(broker, groupId, topic, strategy, customPropsOpt))
     }
 
   }
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index dce4cf9..88f9f4a 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -16,9 +16,11 @@
  */
 package kafka.admin
 
+import java.util.Properties
+
 import joptsimple.OptionException
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.RoundRobinAssignor
+import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{TimeoutException}
 import org.junit.Assert._
@@ -605,5 +607,29 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     fail("Expected an error due to presence of unrecognized --new-consumer option")
   }
 
+  @Test
+  def testDescribeNonOffsetCommitGroup() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+
+    val customProps = new Properties
+    // create a consumer group that never commits offsets
+    customProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(numConsumers = 1, customPropsOpt = Some(customProps))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupOffsets()
+      state.contains("Stable") &&
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 1 &&
+        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
&&
+        assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
&&
+        assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+    }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client
id / host columns in describe results for non-offset-committing group $group.")
+  }
+
 }
 


Mime
View raw message