From commits-return-15029-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Tue Jul 7 03:13:53 2020 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id A76E01A4C9 for ; Tue, 7 Jul 2020 03:13:52 +0000 (UTC) Received: (qmail 95313 invoked by uid 500); 7 Jul 2020 03:13:52 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 95286 invoked by uid 500); 7 Jul 2020 03:13:51 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 95277 invoked by uid 99); 7 Jul 2020 03:13:51 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Jul 2020 03:13:51 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 78866890B8; Tue, 7 Jul 2020 03:13:51 +0000 (UTC) Date: Tue, 07 Jul 2020 03:13:47 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.6 updated: KAFKA-10239: Make GroupInstanceId ignorable in DescribeGroups (#8989) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <159409162423.3077.3080996317298301162@gitbox.apache.org> From: boyang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.6 X-Git-Reftype: branch X-Git-Oldrev: 76f490e785eab9a741be65e0275e9dd76a2e1178 X-Git-Newrev: 5281df14218cedbf8bf26145fc5418962fd39326 X-Git-Rev: 5281df14218cedbf8bf26145fc5418962fd39326 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. boyang pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new 5281df1 KAFKA-10239: Make GroupInstanceId ignorable in DescribeGroups (#8989) 5281df1 is described below commit 5281df14218cedbf8bf26145fc5418962fd39326 Author: Boyang Chen AuthorDate: Mon Jul 6 18:50:40 2020 -0700 KAFKA-10239: Make GroupInstanceId ignorable in DescribeGroups (#8989) This is a bug fix for older admin clients using static membership and call DescribeGroups. By making groupInstanceId ignorable, it would not crash upon handling the response. Added test coverages for DescribeGroups, and some side cleanups. Reviewers: Jason Gustafson --- .../common/requests/DescribeGroupsResponse.java | 12 +-- .../common/message/DescribeGroupsResponse.json | 4 +- .../apache/kafka/common/message/MessageTest.java | 107 ++++++++++++++++++--- 3 files changed, 100 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 4ba4ede..5836c7f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -78,15 +78,15 @@ public class DescribeGroupsResponse extends AbstractResponse { final String protocol, final List members, final Set authorizedOperations) { - DescribedGroup groupMetada = new DescribedGroup(); - groupMetada.setGroupId(groupId) + DescribedGroup groupMetadata = new DescribedGroup(); + groupMetadata.setGroupId(groupId) .setErrorCode(error.code()) .setGroupState(state) .setProtocolType(protocolType) .setProtocolData(protocol) .setMembers(members) .setAuthorizedOperations(Utils.to32BitField(authorizedOperations)); - return groupMetada; + return groupMetadata; } public static DescribedGroup groupMetadata( @@ -97,15 +97,15 @@ public class DescribeGroupsResponse extends AbstractResponse { final String protocol, final List members, final int authorizedOperations) { - DescribedGroup groupMetada = new DescribedGroup(); - groupMetada.setGroupId(groupId) + DescribedGroup groupMetadata = new DescribedGroup(); + groupMetadata.setGroupId(groupId) .setErrorCode(error.code()) .setGroupState(state) .setProtocolType(protocolType) .setProtocolData(protocol) .setMembers(members) .setAuthorizedOperations(authorizedOperations); - return groupMetada; + return groupMetadata; } public DescribeGroupsResponseData data() { diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json index 7e8a4d9..f195843 100644 --- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json +++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json @@ -23,7 +23,7 @@ // // Starting in version 3, brokers can send authorized operations. // - // Starting in version 4, the response will include group.instance.id info for members. + // Starting in version 4, the response will optionally include group.instance.id info for members. // // Version 5 is the first flexible version. "validVersions": "0-5", @@ -49,7 +49,7 @@ "about": "The group members.", "fields": [ { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID assigned by the group coordinator." }, - { "name": "GroupInstanceId", "type": "string", "versions": "4+", + { "name": "GroupInstanceId", "type": "string", "versions": "4+", "ignorable": true, "nullableVersions": "4+", "default": "null", "about": "The unique identifier of the consumer instance provided by end user." }, { "name": "ClientId", "type": "string", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index e01fda9..d640f44 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; @@ -219,6 +221,86 @@ public final class MessageTest { } @Test + public void testDescribeGroupsRequestVersions() throws Exception { + testAllMessageRoundTrips(new DescribeGroupsRequestData() + .setGroups(Collections.singletonList("group")) + .setIncludeAuthorizedOperations(false)); + } + + @Test + public void testDescribeGroupsResponseVersions() throws Exception { + DescribedGroupMember baseMember = new DescribedGroupMember() + .setMemberId(memberId); + + DescribedGroup baseGroup = new DescribedGroup() + .setGroupId("group") + .setGroupState("Stable").setErrorCode(Errors.NONE.code()) + .setMembers(Collections.singletonList(baseMember)) + .setProtocolType("consumer"); + DescribeGroupsResponseData baseResponse = new DescribeGroupsResponseData() + .setGroups(Collections.singletonList(baseGroup)); + testAllMessageRoundTrips(baseResponse); + + testAllMessageRoundTripsFromVersion((short) 1, baseResponse.setThrottleTimeMs(10)); + + baseGroup.setAuthorizedOperations(1); + testAllMessageRoundTripsFromVersion((short) 3, baseResponse); + + baseMember.setGroupInstanceId(instanceId); + testAllMessageRoundTripsFromVersion((short) 4, baseResponse); + } + + @Test + public void testGroupInstanceIdIgnorableInDescribeGroupsResponse() throws Exception { + DescribeGroupsResponseData responseWithGroupInstanceId = + new DescribeGroupsResponseData() + .setGroups(Collections.singletonList( + new DescribedGroup() + .setGroupId("group") + .setGroupState("Stable") + .setErrorCode(Errors.NONE.code()) + .setMembers(Collections.singletonList( + new DescribedGroupMember() + .setMemberId(memberId) + .setGroupInstanceId(instanceId))) + .setProtocolType("consumer") + )); + + DescribeGroupsResponseData expectedResponse = new DescribeGroupsResponseData( + responseWithGroupInstanceId.toStruct(responseWithGroupInstanceId.highestSupportedVersion()), + responseWithGroupInstanceId.highestSupportedVersion()); + // Unset GroupInstanceId + expectedResponse.groups().get(0).members().get(0).setGroupInstanceId(null); + + testAllMessageRoundTripsBeforeVersion((short) 4, responseWithGroupInstanceId, expectedResponse); + } + + @Test + public void testThrottleTimeIgnorableInDescribeGroupsResponse() throws Exception { + DescribeGroupsResponseData responseWithGroupInstanceId = + new DescribeGroupsResponseData() + .setGroups(Collections.singletonList( + new DescribedGroup() + .setGroupId("group") + .setGroupState("Stable") + .setErrorCode(Errors.NONE.code()) + .setMembers(Collections.singletonList( + new DescribedGroupMember() + .setMemberId(memberId))) + .setProtocolType("consumer") + )) + .setThrottleTimeMs(10); + + DescribeGroupsResponseData expectedResponse = new DescribeGroupsResponseData( + responseWithGroupInstanceId.toStruct(responseWithGroupInstanceId.highestSupportedVersion()), + responseWithGroupInstanceId.highestSupportedVersion()); + // Unset throttle time + expectedResponse.setThrottleTimeMs(0); + + testAllMessageRoundTripsBeforeVersion((short) 1, responseWithGroupInstanceId, expectedResponse); + } + + @Test public void testOffsetForLeaderEpochVersions() throws Exception { // Version 2 adds optional current leader epoch OffsetForLeaderEpochRequestData.OffsetForLeaderPartition partitionDataNoCurrentEpoch = @@ -247,7 +329,6 @@ public final class MessageTest { testAllMessageRoundTripsBeforeVersion((short) 3, new OffsetForLeaderEpochRequestData().setReplicaId(5), new OffsetForLeaderEpochRequestData().setReplicaId(-2)); - } @Test @@ -743,7 +824,7 @@ public final class MessageTest { * Test that the JSON request files match the schemas accessible through the ApiKey class. */ @Test - public void testRequestSchemas() throws Exception { + public void testRequestSchemas() { for (ApiKeys apiKey : ApiKeys.values()) { Schema[] manualSchemas = apiKey.requestSchemas; Schema[] generatedSchemas = ApiMessageType.fromApiKey(apiKey.id).requestSchemas(); @@ -799,13 +880,9 @@ public final class MessageTest { return true; } if (type.getClass().equals(Type.RECORDS.getClass())) { - if (other.type.getClass().equals(Type.NULLABLE_BYTES.getClass())) { - return true; - } + return other.type.getClass().equals(Type.NULLABLE_BYTES.getClass()); } else if (type.getClass().equals(Type.NULLABLE_BYTES.getClass())) { - if (other.type.getClass().equals(Type.RECORDS.getClass())) { - return true; - } + return other.type.getClass().equals(Type.RECORDS.getClass()); } return false; } @@ -870,7 +947,7 @@ public final class MessageTest { } @Test - public void testDefaultValues() throws Exception { + public void testDefaultValues() { verifyWriteRaisesUve((short) 0, "validateOnly", new CreateTopicsRequestData().setValidateOnly(true)); verifyWriteSucceeds((short) 0, @@ -883,7 +960,7 @@ public final class MessageTest { } @Test - public void testNonIgnorableFieldWithDefaultNull() throws Exception { + public void testNonIgnorableFieldWithDefaultNull() { // Test non-ignorable string field `groupInstanceId` with default null verifyWriteRaisesUve((short) 0, "groupInstanceId", new HeartbeatRequestData() .setGroupId("groupId") @@ -902,7 +979,7 @@ public final class MessageTest { } @Test - public void testWriteNullForNonNullableFieldRaisesException() throws Exception { + public void testWriteNullForNonNullableFieldRaisesException() { CreateTopicsRequestData createTopics = new CreateTopicsRequestData().setTopics(null); for (short i = (short) 0; i <= createTopics.highestSupportedVersion(); i++) { verifyWriteRaisesNpe(i, createTopics); @@ -912,7 +989,7 @@ public final class MessageTest { } @Test - public void testUnknownTaggedFields() throws Exception { + public void testUnknownTaggedFields() { CreateTopicsRequestData createTopics = new CreateTopicsRequestData(); verifyWriteSucceeds((short) 6, createTopics); RawTaggedField field1000 = new RawTaggedField(1000, new byte[] {0x1, 0x2, 0x3}); @@ -921,7 +998,7 @@ public final class MessageTest { verifyWriteSucceeds((short) 6, createTopics); } - private void verifyWriteRaisesNpe(short version, Message message) throws Exception { + private void verifyWriteRaisesNpe(short version, Message message) { ObjectSerializationCache cache = new ObjectSerializationCache(); assertThrows(NullPointerException.class, () -> { int size = message.size(cache, version); @@ -933,7 +1010,7 @@ public final class MessageTest { private void verifyWriteRaisesUve(short version, String problemText, - Message message) throws Exception { + Message message) { ObjectSerializationCache cache = new ObjectSerializationCache(); UnsupportedVersionException e = assertThrows(UnsupportedVersionException.class, () -> { @@ -947,7 +1024,7 @@ public final class MessageTest { e.getMessage().contains(problemText)); } - private void verifyWriteSucceeds(short version, Message message) throws Exception { + private void verifyWriteSucceeds(short version, Message message) { ObjectSerializationCache cache = new ObjectSerializationCache(); int size = message.size(cache, version); ByteBuffer buf = ByteBuffer.allocate(size * 2);