kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From boy...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-10239: Make GroupInstanceId ignorable in DescribeGroups (#8989)
Date Tue, 07 Jul 2020 02:52:24 GMT
This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new c2f26a2  KAFKA-10239: Make GroupInstanceId ignorable in DescribeGroups (#8989)
c2f26a2 is described below

commit c2f26a282ae6ddfe1dbaa818b2d56412e56ce170
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Mon Jul 6 18:50:40 2020 -0700

    KAFKA-10239: Make GroupInstanceId ignorable in DescribeGroups (#8989)
    
    This is a bug fix for older admin clients using static membership and call DescribeGroups.
    By making groupInstanceId ignorable, it would not crash upon handling the response.
    
    Added test coverages for DescribeGroups, and some side cleanups.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../common/requests/DescribeGroupsResponse.java    |  12 +--
 .../common/message/DescribeGroupsResponse.json     |   4 +-
 .../apache/kafka/common/message/MessageTest.java   | 107 ++++++++++++++++++---
 3 files changed, 100 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 4ba4ede..5836c7f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -78,15 +78,15 @@ public class DescribeGroupsResponse extends AbstractResponse {
         final String protocol,
         final List<DescribedGroupMember> members,
         final Set<Byte> authorizedOperations) {
-        DescribedGroup groupMetada = new DescribedGroup();
-        groupMetada.setGroupId(groupId)
+        DescribedGroup groupMetadata = new DescribedGroup();
+        groupMetadata.setGroupId(groupId)
             .setErrorCode(error.code())
             .setGroupState(state)
             .setProtocolType(protocolType)
             .setProtocolData(protocol)
             .setMembers(members)
             .setAuthorizedOperations(Utils.to32BitField(authorizedOperations));
-        return  groupMetada;
+        return  groupMetadata;
     }
 
     public static DescribedGroup groupMetadata(
@@ -97,15 +97,15 @@ public class DescribeGroupsResponse extends AbstractResponse {
         final String protocol,
         final List<DescribedGroupMember> members,
         final int authorizedOperations) {
-        DescribedGroup groupMetada = new DescribedGroup();
-        groupMetada.setGroupId(groupId)
+        DescribedGroup groupMetadata = new DescribedGroup();
+        groupMetadata.setGroupId(groupId)
             .setErrorCode(error.code())
             .setGroupState(state)
             .setProtocolType(protocolType)
             .setProtocolData(protocol)
             .setMembers(members)
             .setAuthorizedOperations(authorizedOperations);
-        return  groupMetada;
+        return  groupMetadata;
     }
 
     public DescribeGroupsResponseData data() {
diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
index 7e8a4d9..f195843 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
@@ -23,7 +23,7 @@
   //
   // Starting in version 3, brokers can send authorized operations.
   //
-  // Starting in version 4, the response will include group.instance.id info for members.
+  // Starting in version 4, the response will optionally include group.instance.id info for
members.
   //
   // Version 5 is the first flexible version.
   "validVersions": "0-5",
@@ -49,7 +49,7 @@
         "about": "The group members.", "fields": [
         { "name": "MemberId", "type": "string", "versions": "0+",
           "about": "The member ID assigned by the group coordinator." },
-        { "name": "GroupInstanceId", "type": "string", "versions": "4+",
+        { "name": "GroupInstanceId", "type": "string", "versions": "4+", "ignorable": true,
           "nullableVersions": "4+", "default": "null",
           "about": "The unique identifier of the consumer instance provided by end user."
},
         { "name": "ClientId", "type": "string", "versions": "0+",
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index e8826a5..0672402 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -20,6 +20,8 @@ package org.apache.kafka.common.message;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
 import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
@@ -216,6 +218,86 @@ public final class MessageTest {
     }
 
     @Test
+    public void testDescribeGroupsRequestVersions() throws Exception {
+        testAllMessageRoundTrips(new DescribeGroupsRequestData()
+                .setGroups(Collections.singletonList("group"))
+                .setIncludeAuthorizedOperations(false));
+    }
+
+    @Test
+    public void testDescribeGroupsResponseVersions() throws Exception {
+        DescribedGroupMember baseMember = new DescribedGroupMember()
+            .setMemberId(memberId);
+
+        DescribedGroup baseGroup = new DescribedGroup()
+                                       .setGroupId("group")
+                                       .setGroupState("Stable").setErrorCode(Errors.NONE.code())
+                                       .setMembers(Collections.singletonList(baseMember))
+                                       .setProtocolType("consumer");
+        DescribeGroupsResponseData baseResponse = new DescribeGroupsResponseData()
+                                                      .setGroups(Collections.singletonList(baseGroup));
+        testAllMessageRoundTrips(baseResponse);
+
+        testAllMessageRoundTripsFromVersion((short) 1, baseResponse.setThrottleTimeMs(10));
+
+        baseGroup.setAuthorizedOperations(1);
+        testAllMessageRoundTripsFromVersion((short) 3, baseResponse);
+
+        baseMember.setGroupInstanceId(instanceId);
+        testAllMessageRoundTripsFromVersion((short) 4, baseResponse);
+    }
+
+    @Test
+    public void testGroupInstanceIdIgnorableInDescribeGroupsResponse() throws Exception {
+        DescribeGroupsResponseData responseWithGroupInstanceId =
+            new DescribeGroupsResponseData()
+                .setGroups(Collections.singletonList(
+                    new DescribedGroup()
+                        .setGroupId("group")
+                        .setGroupState("Stable")
+                        .setErrorCode(Errors.NONE.code())
+                        .setMembers(Collections.singletonList(
+                            new DescribedGroupMember()
+                                .setMemberId(memberId)
+                                .setGroupInstanceId(instanceId)))
+                        .setProtocolType("consumer")
+                ));
+
+        DescribeGroupsResponseData expectedResponse = new DescribeGroupsResponseData(
+            responseWithGroupInstanceId.toStruct(responseWithGroupInstanceId.highestSupportedVersion()),
+            responseWithGroupInstanceId.highestSupportedVersion());
+        // Unset GroupInstanceId
+        expectedResponse.groups().get(0).members().get(0).setGroupInstanceId(null);
+
+        testAllMessageRoundTripsBeforeVersion((short) 4, responseWithGroupInstanceId, expectedResponse);
+    }
+
+    @Test
+    public void testThrottleTimeIgnorableInDescribeGroupsResponse() throws Exception {
+        DescribeGroupsResponseData responseWithGroupInstanceId =
+            new DescribeGroupsResponseData()
+                .setGroups(Collections.singletonList(
+                    new DescribedGroup()
+                        .setGroupId("group")
+                        .setGroupState("Stable")
+                        .setErrorCode(Errors.NONE.code())
+                        .setMembers(Collections.singletonList(
+                            new DescribedGroupMember()
+                                .setMemberId(memberId)))
+                        .setProtocolType("consumer")
+                ))
+                .setThrottleTimeMs(10);
+
+        DescribeGroupsResponseData expectedResponse = new DescribeGroupsResponseData(
+            responseWithGroupInstanceId.toStruct(responseWithGroupInstanceId.highestSupportedVersion()),
+            responseWithGroupInstanceId.highestSupportedVersion());
+        // Unset throttle time
+        expectedResponse.setThrottleTimeMs(0);
+
+        testAllMessageRoundTripsBeforeVersion((short) 1, responseWithGroupInstanceId, expectedResponse);
+    }
+
+    @Test
     public void testOffsetForLeaderEpochVersions() throws Exception {
         // Version 2 adds optional current leader epoch
         OffsetForLeaderEpochRequestData.OffsetForLeaderPartition partitionDataNoCurrentEpoch
=
@@ -244,7 +326,6 @@ public final class MessageTest {
         testAllMessageRoundTripsBeforeVersion((short) 3,
                 new OffsetForLeaderEpochRequestData().setReplicaId(5),
                 new OffsetForLeaderEpochRequestData().setReplicaId(-2));
-
     }
 
     @Test
@@ -699,7 +780,7 @@ public final class MessageTest {
      * Test that the JSON request files match the schemas accessible through the ApiKey class.
      */
     @Test
-    public void testRequestSchemas() throws Exception {
+    public void testRequestSchemas() {
         for (ApiKeys apiKey : ApiKeys.values()) {
             Schema[] manualSchemas = apiKey.requestSchemas;
             Schema[] generatedSchemas = ApiMessageType.fromApiKey(apiKey.id).requestSchemas();
@@ -755,13 +836,9 @@ public final class MessageTest {
                 return true;
             }
             if (type.getClass().equals(Type.RECORDS.getClass())) {
-                if (other.type.getClass().equals(Type.NULLABLE_BYTES.getClass())) {
-                    return true;
-                }
+                return other.type.getClass().equals(Type.NULLABLE_BYTES.getClass());
             } else if (type.getClass().equals(Type.NULLABLE_BYTES.getClass())) {
-                if (other.type.getClass().equals(Type.RECORDS.getClass())) {
-                    return true;
-                }
+                return other.type.getClass().equals(Type.RECORDS.getClass());
             }
             return false;
         }
@@ -826,7 +903,7 @@ public final class MessageTest {
     }
 
     @Test
-    public void testDefaultValues() throws Exception {
+    public void testDefaultValues() {
         verifyWriteRaisesUve((short) 0, "validateOnly",
             new CreateTopicsRequestData().setValidateOnly(true));
         verifyWriteSucceeds((short) 0,
@@ -839,7 +916,7 @@ public final class MessageTest {
     }
 
     @Test
-    public void testNonIgnorableFieldWithDefaultNull() throws Exception {
+    public void testNonIgnorableFieldWithDefaultNull() {
         // Test non-ignorable string field `groupInstanceId` with default null
         verifyWriteRaisesUve((short) 0, "groupInstanceId", new HeartbeatRequestData()
                 .setGroupId("groupId")
@@ -858,7 +935,7 @@ public final class MessageTest {
     }
 
     @Test
-    public void testWriteNullForNonNullableFieldRaisesException() throws Exception {
+    public void testWriteNullForNonNullableFieldRaisesException() {
         CreateTopicsRequestData createTopics = new CreateTopicsRequestData().setTopics(null);
         for (short i = (short) 0; i <= createTopics.highestSupportedVersion(); i++) {
             verifyWriteRaisesNpe(i, createTopics);
@@ -868,7 +945,7 @@ public final class MessageTest {
     }
 
     @Test
-    public void testUnknownTaggedFields() throws Exception {
+    public void testUnknownTaggedFields() {
         CreateTopicsRequestData createTopics = new CreateTopicsRequestData();
         verifyWriteSucceeds((short) 6, createTopics);
         RawTaggedField field1000 = new RawTaggedField(1000, new byte[] {0x1, 0x2, 0x3});
@@ -877,7 +954,7 @@ public final class MessageTest {
         verifyWriteSucceeds((short) 6, createTopics);
     }
 
-    private void verifyWriteRaisesNpe(short version, Message message) throws Exception {
+    private void verifyWriteRaisesNpe(short version, Message message) {
         ObjectSerializationCache cache = new ObjectSerializationCache();
         assertThrows(NullPointerException.class, () -> {
             int size = message.size(cache, version);
@@ -889,7 +966,7 @@ public final class MessageTest {
 
     private void verifyWriteRaisesUve(short version,
                                       String problemText,
-                                     Message message) throws Exception {
+                                      Message message) {
         ObjectSerializationCache cache = new ObjectSerializationCache();
         UnsupportedVersionException e =
             assertThrows(UnsupportedVersionException.class, () -> {
@@ -903,7 +980,7 @@ public final class MessageTest {
                 e.getMessage().contains(problemText));
     }
 
-    private void verifyWriteSucceeds(short version, Message message) throws Exception {
+    private void verifyWriteSucceeds(short version, Message message) {
         ObjectSerializationCache cache = new ObjectSerializationCache();
         int size = message.size(cache, version);
         ByteBuffer buf = ByteBuffer.allocate(size * 2);


Mime
View raw message