kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [kafka] branch trunk updated: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh (#10858)
Date Tue, 29 Jun 2021 07:02:30 GMT
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d95c191  KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative
offsets while running kafka-consumer-groups.sh (#10858)
d95c191 is described below

commit d95c1919458bd5621774394c9eb61698ce2187b8
Author: Ignacio Acuña Frías <31544929+IgnacioAcunaF@users.noreply.github.com>
AuthorDate: Tue Jun 29 03:00:56 2021 -0400

    KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets
while running kafka-consumer-groups.sh (#10858)
    
    This patch fixes the `ConsumerGroupCommand` to correctly handle missing offsets, which
are returned as `null` by the admin API.
    
    Reviewers: David Jacot <djacot@confluent.io>
---
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 11 ++-
 .../kafka/admin/ConsumerGroupServiceTest.scala     | 82 ++++++++++++++++++++++
 2 files changed, 86 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 6c6090f..74b7224 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -560,27 +560,24 @@ object ConsumerGroupCommand extends Logging {
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]()
++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
         val committedOffsets = getCommittedOffsets(groupId)
+        // The admin client returns `null` as a value to indicate that there is not committed
offset for a partition.
+        def getPartitionOffset(tp: TopicPartition): Option[Long] = committedOffsets.get(tp).filter(_
!= null).map(_.offset)
         var assignedTopicPartitions = ListBuffer[TopicPartition]()
         val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
           .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap
{ consumerSummary =>
           val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
           assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
-          val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
-            .map { topicPartition =>
-              topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
-            }.toMap
           collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
-            partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
+            getPartitionOffset, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
             Some(s"${consumerSummary.clientId}"))
         }
-
         val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp)
}
         val rowsWithoutConsumer = if (unassignedPartitions.nonEmpty) {
           collectConsumerAssignment(
             groupId,
             Option(consumerGroup.coordinator),
             unassignedPartitions.keySet.toSeq,
-            unassignedPartitions.map { case (tp, offset) => tp -> Some(offset.offset)
},
+            getPartitionOffset,
             Some(MISSING_COLUMN_VALUE),
             Some(MISSING_COLUMN_VALUE),
             Some(MISSING_COLUMN_VALUE)).toSeq
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
index 86bf674..3b3b781 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito._
+import org.mockito.ArgumentMatcher
 
 import scala.jdk.CollectionConverters._
 
@@ -63,6 +64,87 @@ class ConsumerGroupServiceTest {
   }
 
   @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe",
"--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values
(negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis,
Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1",
"host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition,
OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter { case (tp, _) => assignedTopicPartitions.contains(tp)
}.asJava))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter { case (tp, _) => unassignedTopicPartitions.contains(tp)
}.asJava))
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    val returnedOffsets = assignments.map { results =>
+      results.map { assignment =>
+        new TopicPartition(assignment.topic.get, assignment.partition.get) -> assignment.offset
+      }.toMap
+    }.getOrElse(Map.empty)
+
+    val expectedOffsets = Map(
+      testTopicPartition0 -> None,
+      testTopicPartition1 -> Some(100),
+      testTopicPartition2 -> None,
+      testTopicPartition3 -> Some(100),
+      testTopicPartition4 -> Some(100),
+      testTopicPartition5 -> None
+    )
+    assertEquals(Some("Stable"), state)
+    assertEquals(expectedOffsets, returnedOffsets)
+
+    verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any())
+    verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+    verify(admin, times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
any())
+    verify(admin, times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
any())
+  }
+
+  @Test
   def testAdminRequestsForResetOffsets(): Unit = {
     val args = Seq("--bootstrap-server", "localhost:9092", "--group", group, "--reset-offsets",
"--to-latest")
     val topicsWithoutPartitionsSpecified = topics.tail

Mime
View raw message