kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8954; Topic existence check is wrongly implemented in the DeleteOffset API (KIP-496) (#7406)
Date Wed, 09 Oct 2019 15:53:03 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 4596ca0  KAFKA-8954; Topic existence check is wrongly implemented in the DeleteOffset
API (KIP-496) (#7406)
4596ca0 is described below

commit 4596ca0a8036b46d27a8f449a2068bf89ad00033
Author: David Jacot <djacot@confluent.io>
AuthorDate: Wed Oct 9 17:36:49 2019 +0200

    KAFKA-8954; Topic existence check is wrongly implemented in the DeleteOffset API (KIP-496)
(#7406)
    
    This patch changes the way topic existence is checked in the DeleteOffset API. Previously,
it was relying on the committed offsets. Now, it relies on the metadata cache which is better.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/coordinator/group/GroupCoordinator.scala | 14 +----
 core/src/main/scala/kafka/server/KafkaApis.scala   | 32 +++++++----
 .../coordinator/group/GroupCoordinatorTest.scala   | 40 ++++++-------
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 65 +++++++++++++++++++++-
 4 files changed, 104 insertions(+), 47 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 64884ec..22f15f9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -565,22 +565,14 @@ class GroupCoordinator(val brokerId: Int,
                     Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR
 
                 case Empty =>
-                  val (knownPartitions, unknownPartitions) =
-                    partitions.partition(tp => group.offset(tp).nonEmpty)
-
-                  partitionEligibleForDeletion = knownPartitions
-                  partitionErrors = unknownPartitions.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION).toMap
+                  partitionEligibleForDeletion = partitions
 
                 case PreparingRebalance | CompletingRebalance | Stable if group.isConsumerGroup
=>
-                  val (knownPartitions, unknownPartitions) =
-                    partitions.partition(tp => group.offset(tp).nonEmpty)
-
                   val (consumed, notConsumed) =
-                    knownPartitions.partition(tp => group.isSubscribedToTopic(tp.topic()))
+                    partitions.partition(tp => group.isSubscribedToTopic(tp.topic()))
 
                   partitionEligibleForDeletion = notConsumed
-                  partitionErrors = consumed.map(_ -> Errors.GROUP_SUBSCRIBED_TO_TOPIC).toMap
++
-                    unknownPartitions.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION).toMap
+                  partitionErrors = consumed.map(_ -> Errors.GROUP_SUBSCRIBED_TO_TOPIC).toMap
 
                 case _ =>
                   groupError = Errors.NON_EMPTY_GROUP
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c7747b6..9a0ebd7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2688,27 +2688,35 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groupId = offsetDeleteRequest.data.groupId
 
     if (authorize(request, DELETE, GROUP, groupId)) {
-      val topicPartitions = offsetDeleteRequest.data.topics.asScala.flatMap { topic =>
-        topic.partitions.asScala.map { partition =>
-          new TopicPartition(topic.name, partition.partitionIndex)
+      val authorizedTopics = filterAuthorized(request, READ, TOPIC,
+        offsetDeleteRequest.data.topics.asScala.map(_.name).toSeq)
+
+      val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
+      val topicPartitions = mutable.ArrayBuffer[TopicPartition]()
+
+      for (topic <- offsetDeleteRequest.data.topics.asScala) {
+        for (partition <- topic.partitions.asScala) {
+          val tp = new TopicPartition(topic.name, partition.partitionIndex)
+          if (!authorizedTopics.contains(topic.name))
+            topicPartitionErrors(tp) = Errors.TOPIC_AUTHORIZATION_FAILED
+          else if (!metadataCache.contains(tp))
+            topicPartitionErrors(tp) = Errors.UNKNOWN_TOPIC_OR_PARTITION
+          else
+            topicPartitions += tp
         }
-      }.toSeq
-
-      val authorizedTopics = filterAuthorized(request, READ, TOPIC, topicPartitions.map(_.topic))
-      val (authorizedTopicPartitions, unauthorizedTopicPartitions) = topicPartitions.partition
{ topicPartition =>
-        authorizedTopics.contains(topicPartition.topic)
       }
 
-      val unauthorizedTopicPartitionsErrors = unauthorizedTopicPartitions.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED)
-      val (groupError, authorizedTopicPartitionsErrors) = groupCoordinator.handleDeleteOffsets(groupId,
authorizedTopicPartitions)
-      val topicPartitionsErrors = unauthorizedTopicPartitionsErrors ++ authorizedTopicPartitionsErrors
+      val (groupError, authorizedTopicPartitionsErrors) = groupCoordinator.handleDeleteOffsets(
+        groupId, topicPartitions)
+
+      topicPartitionErrors ++= authorizedTopicPartitionsErrors
 
       sendResponseMaybeThrottle(request, requestThrottleMs => {
         if (groupError != Errors.NONE)
           offsetDeleteRequest.getErrorResponse(requestThrottleMs, groupError)
         else {
           val topics = new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
-          topicPartitionsErrors.groupBy(_._1.topic).map { case (topic, topicPartitions) =>
+          topicPartitionErrors.groupBy(_._1.topic).map { case (topic, topicPartitions) =>
             val partitions = new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
             topicPartitions.map { case (topicPartition, error) =>
               partitions.add(
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 91a963d..cd85e0a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -2827,7 +2827,7 @@ class GroupCoordinatorTest {
     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
 
     assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError)
-    assert(topics.isEmpty)
+    assertTrue(topics.isEmpty)
   }
 
   @Test
@@ -2838,7 +2838,7 @@ class GroupCoordinatorTest {
     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
 
     assertEquals(Errors.NON_EMPTY_GROUP, groupError)
-    assert(topics.isEmpty)
+    assertTrue(topics.isEmpty)
   }
 
   @Test
@@ -2868,7 +2868,7 @@ class GroupCoordinatorTest {
     val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId)
     verifyLeaveGroupResult(leaveGroupResults)
 
-    assert(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Empty)))
+    assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Empty)))
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val partition: Partition = EasyMock.niceMock(classOf[Partition])
@@ -2882,7 +2882,7 @@ class GroupCoordinatorTest {
     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0))
 
     assertEquals(Errors.NONE, groupError)
-    assert(topics.size == 1)
+    assertEquals(1, topics.size)
     assertEquals(Some(Errors.NONE), topics.get(t1p0))
 
     val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, Some(Seq(t1p0,
t2p0)))
@@ -2900,21 +2900,19 @@ class GroupCoordinatorTest {
     val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.leaderId,
Map.empty)
     assertEquals(Errors.NONE, syncGroupResult._2)
 
-    val t1p0 = new TopicPartition("foo", 0)
-    val t2p0 = new TopicPartition("bar", 0)
+    val tp = new TopicPartition("foo", 0)
     val offset = offsetAndMetadata(37)
 
     EasyMock.reset(replicaManager)
     val validOffsetCommitResult = commitOffsets(groupId, joinGroupResult.memberId, joinGroupResult.generationId,
-      Map(t1p0 -> offset))
-    assertEquals(Errors.NONE, validOffsetCommitResult(t1p0))
+      Map(tp -> offset))
+    assertEquals(Errors.NONE, validOffsetCommitResult(tp))
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0, t2p0))
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
 
     assertEquals(Errors.NONE, groupError)
-    assert(topics.size == 2)
-    assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(t1p0))
-    assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), topics.get(t2p0))
+    assertEquals(1, topics.size)
+    assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(tp))
   }
 
   @Test
@@ -2927,7 +2925,7 @@ class GroupCoordinatorTest {
     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp))
 
     assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError)
-    assert(topics.size == 0)
+    assertTrue(topics.isEmpty)
   }
 
   @Test
@@ -2943,7 +2941,6 @@ class GroupCoordinatorTest {
 
     val t1p0 = new TopicPartition("foo", 0)
     val t2p0 = new TopicPartition("bar", 0)
-    val t3p0 = new TopicPartition("unknown", 0)
     val offset = offsetAndMetadata(37)
 
     EasyMock.reset(replicaManager)
@@ -2957,7 +2954,7 @@ class GroupCoordinatorTest {
     val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId)
     verifyLeaveGroupResult(leaveGroupResults)
 
-    assert(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Empty)))
+    assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Empty)))
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val partition: Partition = EasyMock.niceMock(classOf[Partition])
@@ -2968,12 +2965,11 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0, t3p0))
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0))
 
     assertEquals(Errors.NONE, groupError)
-    assert(topics.size == 2)
+    assertEquals(1, topics.size)
     assertEquals(Some(Errors.NONE), topics.get(t1p0))
-    assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), topics.get(t3p0))
 
     val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, Some(Seq(t1p0,
t2p0)))
 
@@ -2997,7 +2993,6 @@ class GroupCoordinatorTest {
 
     val t1p0 = new TopicPartition("foo", 0)
     val t2p0 = new TopicPartition("bar", 0)
-    val t3p0 = new TopicPartition("unknown", 0)
     val offset = offsetAndMetadata(37)
 
     EasyMock.reset(replicaManager)
@@ -3006,7 +3001,7 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, validOffsetCommitResult(t1p0))
     assertEquals(Errors.NONE, validOffsetCommitResult(t2p0))
 
-    assert(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Stable)))
+    assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Stable)))
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val partition: Partition = EasyMock.niceMock(classOf[Partition])
@@ -3017,13 +3012,12 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0, t2p0,
t3p0))
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(t1p0, t2p0))
 
     assertEquals(Errors.NONE, groupError)
-    assert(topics.size == 3)
+    assertEquals(2, topics.size)
     assertEquals(Some(Errors.NONE), topics.get(t1p0))
     assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(t2p0))
-    assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), topics.get(t3p0))
 
     val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, Some(Seq(t1p0,
t2p0)))
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index bf1a8c4..66bed52 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -46,9 +46,10 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata,
_}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import EasyMock._
-import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData,
OffsetCommitResponseData, SyncGroupRequestData, TxnOffsetCommitRequestData}
+import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData,
OffsetCommitResponseData, OffsetDeleteRequestData, SyncGroupRequestData, TxnOffsetCommitRequestData}
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
+import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition,
OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
 import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint,
UpdateMetadataPartitionState}
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.server.authorizer.Authorizer
@@ -393,6 +394,68 @@ class KafkaApisTest {
     testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION)
   }
 
+  @Test
+  def testOffsetDeleteWithInvalidPartition(): Unit = {
+    val group = "groupId"
+    val topic = "topic"
+    setupBasicMetadataCache(topic, numPartitions = 1)
+
+    def checkInvalidPartition(invalidPartitionId: Int): Unit = {
+      EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+      val topics = new OffsetDeleteRequestTopicCollection()
+      topics.add(new OffsetDeleteRequestTopic()
+        .setName(topic)
+        .setPartitions(Collections.singletonList(
+          new OffsetDeleteRequestPartition().setPartitionIndex(invalidPartitionId))))
+      val (offsetDeleteRequest, request) = buildRequest(new OffsetDeleteRequest.Builder(
+        new OffsetDeleteRequestData()
+          .setGroupId(group)
+          .setTopics(topics)
+      ))
+
+      val capturedResponse = expectNoThrottling()
+      EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty)))
+        .andReturn((Errors.NONE, Map.empty))
+      EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+      createKafkaApis().handleOffsetDeleteRequest(request)
+
+      val response = readResponse(ApiKeys.OFFSET_DELETE, offsetDeleteRequest, capturedResponse)
+        .asInstanceOf[OffsetDeleteResponse]
+
+      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+        Errors.forCode(response.data.topics.find(topic).partitions.find(invalidPartitionId).errorCode()))
+    }
+
+    checkInvalidPartition(-1)
+    checkInvalidPartition(1) // topic has only one partition
+  }
+
+  @Test
+  def testOffsetDeleteWithInvalidGroup(): Unit = {
+    val group = "groupId"
+
+    EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+    val (offsetDeleteRequest, request) = buildRequest(new OffsetDeleteRequest.Builder(
+      new OffsetDeleteRequestData()
+        .setGroupId(group)
+    ))
+
+    val capturedResponse = expectNoThrottling()
+    EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty)))
+      .andReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty))
+    EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+    createKafkaApis().handleOffsetDeleteRequest(request)
+
+    val response = readResponse(ApiKeys.OFFSET_DELETE, offsetDeleteRequest, capturedResponse)
+      .asInstanceOf[OffsetDeleteResponse]
+
+    assertEquals(Errors.GROUP_ID_NOT_FOUND, Errors.forCode(response.data.errorCode()))
+  }
+
   private def testListOffsetFailedGetLeaderReplica(error: Errors): Unit = {
     val tp = new TopicPartition("foo", 0)
     val isolationLevel = IsolationLevel.READ_UNCOMMITTED


Mime
View raw message