kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Add ignorable field check to `toStruct` and fix usage (#7710)
Date Sat, 23 Nov 2019 06:05:32 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b94c7f4  MINOR: Add ignorable field check to `toStruct` and fix usage (#7710)
b94c7f4 is described below

commit b94c7f479b917d4ec602c31a24f11390627c479b
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Fri Nov 22 22:05:03 2019 -0800

    MINOR: Add ignorable field check to `toStruct` and fix usage (#7710)
    
    If a field is not marked as ignorable, we should raise an exception if it has been set
to a non-default value. This check already exists in `Message.write`, so this patch adds it
to `Message.toStruct`. Additionally, we fix several fields which should have been marked ignorable
and we fix some related test assertions.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Manikumar Reddy <manikumar.reddy@gmail.com>,
Colin Patrick McCabe <cmccabe@apache.org>
---
 .../common/requests/DescribeGroupsResponse.java    |  4 ++-
 .../kafka/common/requests/MetadataResponse.java    |  2 +-
 .../common/message/ApiVersionsRequest.json         |  4 +--
 .../common/message/OffsetFetchResponse.json        |  4 +--
 .../common/message/SaslAuthenticateResponse.json   |  2 +-
 .../common/message/SimpleExampleMessageTest.java   | 12 +++++++
 .../common/requests/JoinGroupRequestTest.java      |  1 -
 .../kafka/common/requests/RequestResponseTest.java | 36 ++++++++++-----------
 core/src/main/scala/kafka/server/KafkaApis.scala   | 26 ++++++++-------
 .../kafka/api/BaseAdminIntegrationTest.scala       |  6 ++--
 .../api/DescribeAuthorizedOperationsTest.scala     | 18 +++++------
 .../unit/kafka/server/MetadataRequestTest.scala    | 33 ++++++++++++-------
 .../apache/kafka/message/MessageDataGenerator.java | 37 +++++++++++++---------
 13 files changed, 108 insertions(+), 77 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index cb369a0..4ba4ede 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -33,6 +33,8 @@ import java.util.Set;
 
 public class DescribeGroupsResponse extends AbstractResponse {
 
+    public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;
+
     /**
      * Possible per-group error codes:
      *
@@ -135,7 +137,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
 
     public static DescribedGroup forError(String groupId, Errors error) {
         return groupMetadata(groupId, error, DescribeGroupsResponse.UNKNOWN_STATE, DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
-                DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), Collections.emptySet());
+                DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), AUTHORIZED_OPERATIONS_OMITTED);
     }
 
     public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors error, List<String>
groupIds) {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index ef5381b..9b263b1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -232,7 +232,7 @@ public class MetadataResponse extends AbstractResponse {
                              String topic,
                              boolean isInternal,
                              List<PartitionMetadata> partitionMetadata) {
-            this(error, topic, isInternal, partitionMetadata, 0);
+            this(error, topic, isInternal, partitionMetadata, AUTHORIZED_OPERATIONS_OMITTED);
         }
 
         public Errors error() {
diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json b/clients/src/main/resources/common/message/ApiVersionsRequest.json
index 79fe7a7..66e4511 100644
--- a/clients/src/main/resources/common/message/ApiVersionsRequest.json
+++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json
@@ -24,8 +24,8 @@
   "flexibleVersions": "3+",
   "fields": [
     { "name": "ClientSoftwareName", "type": "string", "versions": "3+",
-      "about": "The name of the client." },
+      "ignorable": true, "about": "The name of the client." },
     { "name": "ClientSoftwareVersion", "type": "string", "versions": "3+",
-      "about": "The version of the client." }
+      "ignorable": true, "about": "The version of the client." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index e0d5e35..97bc6dd 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -44,14 +44,14 @@
         { "name": "CommittedOffset", "type": "int64", "versions": "0+",
           "about": "The committed message offset." },
         { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1",
-          "about": "The leader epoch." },
+          "ignorable": true, "about": "The leader epoch." },
         { "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+",
           "about": "The partition metadata." },
         { "name": "ErrorCode", "type": "int16", "versions": "0+",
           "about": "The error code, or 0 if there was no error." }
       ]}
     ]},
-    { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", "ignorable":
false,
+    { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", "ignorable":
true,
       "about": "The top-level error code, or 0 if there was no error." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/SaslAuthenticateResponse.json b/clients/src/main/resources/common/message/SaslAuthenticateResponse.json
index 66dfe74..1ad665f 100644
--- a/clients/src/main/resources/common/message/SaslAuthenticateResponse.json
+++ b/clients/src/main/resources/common/message/SaslAuthenticateResponse.json
@@ -27,7 +27,7 @@
       "about": "The error message, or null if there was no error." },
     { "name": "AuthBytes", "type": "bytes", "versions": "0+",
       "about": "The SASL authentication bytes from the server, as defined by the SASL mechanism."
},
-    { "name": "SessionLifetimeMs", "type": "int64", "versions": "1+", "default": "0",
+    { "name": "SessionLifetimeMs", "type": "int64", "versions": "1+", "default": "0", "ignorable":
true,
       "about": "The SASL authentication bytes from the server, as defined by the SASL mechanism."
}
   ]
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
index 4c697f2..8d09fa7 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.message;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -31,6 +32,7 @@ import java.util.function.Consumer;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class SimpleExampleMessageTest {
@@ -54,6 +56,16 @@ public class SimpleExampleMessageTest {
     }
 
     @Test
+    public void shouldThrowIfCannotWriteNonIgnorableField() {
+        // processId is not supported in v0 and is not marked as ignorable
+
+        final SimpleExampleMessageData out = new SimpleExampleMessageData().setProcessId(UUID.randomUUID());
+        assertThrows(UnsupportedVersionException.class, () ->
+                out.write(new ByteBufferAccessor(ByteBuffer.allocate(64)), new ObjectSerializationCache(),
(short) 0));
+        assertThrows(UnsupportedVersionException.class, () -> out.toStruct((short) 0));
+    }
+
+    @Test
     public void shouldDefaultField() {
         final SimpleExampleMessageData out = new SimpleExampleMessageData();
         assertEquals(UUID.fromString("00000000-0000-0000-0000-000000000000"), out.processId());
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
index 9d8031c..125a328 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
@@ -86,7 +86,6 @@ public class JoinGroupRequestTest {
         Struct struct = new JoinGroupRequestData()
                 .setGroupId("groupId")
                 .setMemberId("consumerId")
-                .setGroupInstanceId("groupInstanceId")
                 .setProtocolType("consumer")
                 .setSessionTimeoutMs(sessionTimeoutMs)
                 .toStruct((short) 0);
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d5db9b2..f8f9dd4 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
@@ -34,30 +34,32 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.ApiVersionsRequestData;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
 import org.apache.kafka.common.message.ControlledShutdownRequestData;
+import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection;
-import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
 import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
 import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
-import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.DeleteGroupsRequestData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
 import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
-import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
@@ -68,15 +70,11 @@ import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
-import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
-import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
-import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
-import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
-import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -87,6 +85,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.OffsetDeleteRequestData;
@@ -1039,12 +1039,12 @@ public class RequestResponseTest {
         DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId",
null,
                 clientId, clientHost, new byte[0], new byte[0]);
         DescribedGroup metadata = DescribeGroupsResponse.groupMetadata("test-group",
-                                                                       Errors.NONE,
-                                                                       "STABLE",
-                                                                       "consumer",
-                                                                       "roundrobin",
-                                                                       Collections.singletonList(member),
-                                                                       Collections.emptySet());
+                Errors.NONE,
+                "STABLE",
+                "consumer",
+                "roundrobin",
+                Collections.singletonList(member),
+                DescribeGroupsResponse.AUTHORIZED_OPERATIONS_OMITTED);
         describeGroupsResponseData.groups().add(metadata);
         return new DescribeGroupsResponse(describeGroupsResponseData);
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4c83423..12621ad 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1129,18 +1129,22 @@ class KafkaApis(val requestChannel: RequestChannel,
         getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
           errorUnavailableEndpoints, errorUnavailableListeners)
 
-    var clusterAuthorizedOperations = 0
-
+    var clusterAuthorizedOperations = Int.MinValue
     if (request.header.apiVersion >= 8) {
       // get cluster authorized operations
-      if (metadataRequest.data().includeClusterAuthorizedOperations() &&
-        authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
-        clusterAuthorizedOperations = authorizedOperations(request, Resource.CLUSTER)
+      if (metadataRequest.data.includeClusterAuthorizedOperations) {
+        if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
+          clusterAuthorizedOperations = authorizedOperations(request, Resource.CLUSTER)
+        else
+          clusterAuthorizedOperations = 0
+      }
+
       // get topic authorized operations
-      if (metadataRequest.data().includeTopicAuthorizedOperations())
-        topicMetadata.foreach(topicData => {
-          topicData.authorizedOperations(authorizedOperations(request, new Resource(ResourceType.TOPIC,
topicData.topic())))
-        })
+      if (metadataRequest.data.includeTopicAuthorizedOperations) {
+        topicMetadata.foreach { topicData =>
+          topicData.authorizedOperations(authorizedOperations(request, new Resource(ResourceType.TOPIC,
topicData.topic)))
+        }
+      }
     }
 
     val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
@@ -1337,10 +1341,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setMembers(members.asJava)
 
         if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data().includeAuthorizedOperations())
{
+          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations)
{
             describedGroup.setAuthorizedOperations(authorizedOperations(request, new Resource(ResourceType.GROUP,
groupId)))
-          } else {
-            describedGroup.setAuthorizedOperations(0)
           }
         }
 
diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index dd115b2..d680dfb 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -158,7 +158,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness
with Logg
 
     // without includeAuthorizedOperations flag
     var result = client.describeCluster
-    assertEquals(Set().asJava, result.authorizedOperations().get())
+    assertNull(result.authorizedOperations().get())
 
     //with includeAuthorizedOperations flag
     result = client.describeCluster(new DescribeClusterOptions().includeAuthorizedOperations(true))
@@ -172,7 +172,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness
with Logg
 
     // without includeAuthorizedOperations flag
     var topicResult = getTopicMetadata(client, topic)
-    assertEquals(Set().asJava, topicResult.authorizedOperations)
+    assertNull(topicResult.authorizedOperations)
 
     //with includeAuthorizedOperations flag
     topicResult = getTopicMetadata(client, topic, new DescribeTopicsOptions().includeAuthorizedOperations(true))
@@ -181,7 +181,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness
with Logg
     assertEquals(expectedOperations, topicResult.authorizedOperations)
   }
 
-  def configuredClusterPermissions() : Set[AclOperation] = {
+  def configuredClusterPermissions(): Set[AclOperation] = {
     Cluster.supportedOperations.map(operation => operation.toJava)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index bda8271..3022af1 100644
--- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -21,14 +21,14 @@ import kafka.security.authorizer.AclAuthorizer
 import kafka.server.KafkaConfig
 import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation.{ALTER, DESCRIBE, CLUSTER_ACTION}
+import org.apache.kafka.common.acl.AclOperation.{ALTER, CLUSTER_ACTION, DESCRIBE}
 import org.apache.kafka.common.acl.AclPermissionType.ALLOW
+import org.apache.kafka.common.acl._
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.authorizer.Authorizer
-import org.junit.Assert.{assertEquals, assertFalse}
+import org.junit.Assert.{assertEquals, assertFalse, assertNull}
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
@@ -96,7 +96,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with
SaslS
   val group3Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group3, PatternType.LITERAL),
     new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2,
"*", AclOperation.DELETE, AclPermissionType.ALLOW))
 
-  val clusteAllAcl = new AclBinding(Resource.ClusterResource.toPattern,
+  val clusterAllAcl = new AclBinding(Resource.ClusterResource.toPattern,
     new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2,
"*", AclOperation.ALL, AclPermissionType.ALLOW))
 
   val topic1Acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic1, PatternType.LITERAL),
@@ -145,7 +145,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness
with SaslS
 
     // test without includeAuthorizedOperations flag
     var clusterDescribeResult = client.describeCluster()
-    assertEquals(Set(), clusterDescribeResult.authorizedOperations().get().asScala.toSet)
+    assertNull(clusterDescribeResult.authorizedOperations.get())
 
     //test with includeAuthorizedOperations flag, we have give Alter permission
     // in configureSecurityBeforeServersStart()
@@ -155,8 +155,8 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness
with SaslS
       clusterDescribeResult.authorizedOperations().get().asScala.toSet)
 
     // enable all operations for cluster resource
-    val results = client.createAcls(List(clusteAllAcl).asJava)
-    assertEquals(Set(clusteAllAcl), results.values.keySet.asScala)
+    val results = client.createAcls(List(clusterAllAcl).asJava)
+    assertEquals(Set(clusterAllAcl), results.values.keySet.asScala)
     results.all.get
 
     val expectedOperations = Cluster.supportedOperations
@@ -175,8 +175,8 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness
with SaslS
 
     // test without includeAuthorizedOperations flag
     var describeTopicsResult = client.describeTopics(Set(topic1, topic2).asJava).all.get()
-    assertEquals(Set(), describeTopicsResult.get(topic1).authorizedOperations().asScala.toSet)
-    assertEquals(Set(), describeTopicsResult.get(topic2).authorizedOperations().asScala.toSet)
+    assertNull(describeTopicsResult.get(topic1).authorizedOperations)
+    assertNull(describeTopicsResult.get(topic2).authorizedOperations)
 
     //test with includeAuthorizedOperations flag
     describeTopicsResult = client.describeTopics(Set(topic1, topic2).asJava,
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 2b3f321..df812e8 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -22,13 +22,15 @@ import java.util.Properties
 import kafka.network.SocketServer
 import kafka.utils.TestUtils
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.MetadataRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.apache.kafka.test.TestUtils.isValidClusterId
 import org.junit.Assert._
 import org.junit.{Before, Test}
-import org.apache.kafka.test.TestUtils.isValidClusterId
+import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
 import scala.collection.Seq
@@ -126,8 +128,7 @@ class MetadataRequestTest extends BaseRequestTest {
 
   @Test
   def testAutoTopicCreation(): Unit = {
-    def checkAutoCreatedTopic(existingTopic: String, autoCreatedTopic: String, response:
MetadataResponse): Unit = {
-      assertNull(response.errors.get(existingTopic))
+    def checkAutoCreatedTopic(autoCreatedTopic: String, response: MetadataResponse): Unit
= {
       assertEquals(Errors.LEADER_NOT_AVAILABLE, response.errors.get(autoCreatedTopic))
       assertEquals(Some(servers.head.config.numPartitions), zkClient.getTopicPartitionCount(autoCreatedTopic))
       for (i <- 0 until servers.head.config.numPartitions)
@@ -138,20 +139,28 @@ class MetadataRequestTest extends BaseRequestTest {
     val topic2 = "t2"
     val topic3 = "t3"
     val topic4 = "t4"
-    createTopic(topic1, 1, 1)
+    val topic5 = "t5"
+    createTopic(topic1, numPartitions = 1, replicationFactor = 1)
 
-    val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava,
true, ApiKeys.METADATA.latestVersion).build())
-    checkAutoCreatedTopic(topic1, topic2, response1)
+    val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava,
true).build())
+    assertNull(response1.errors.get(topic1))
+    checkAutoCreatedTopic(topic2, response1)
 
-    // V3 doesn't support a configurable allowAutoTopicCreation, so the fact that we set
it to `false` has no effect
-    val response2 = sendMetadataRequest(new MetadataRequest(requestData(List(topic2, topic3),
false), 3.toShort))
-    checkAutoCreatedTopic(topic2, topic3, response2)
+    // The default behavior in old versions of the metadata API is to allow topic creation,
so
+    // protocol downgrades should happen gracefully when auto-creation is explicitly requested.
+    val response2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic3).asJava, true).build(1))
+    checkAutoCreatedTopic(topic3, response2)
+
+    // V3 doesn't support a configurable allowAutoTopicCreation, so disabling auto-creation
is not supported
+    intercept[UnsupportedVersionException] {
+      sendMetadataRequest(new MetadataRequest(requestData(List(topic4), false), 3.toShort))
+    }
 
     // V4 and higher support a configurable allowAutoTopicCreation
-    val response3 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic3, topic4).asJava,
false, 4.toShort).build)
-    assertNull(response3.errors.get(topic3))
+    val response3 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic4, topic5).asJava,
false, 4.toShort).build)
     assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic4))
-    assertEquals(None, zkClient.getTopicPartitionCount(topic4))
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic5))
+    assertEquals(None, zkClient.getTopicPartitionCount(topic5))
   }
 
   @Test
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index e1a41d6..263da12 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -843,6 +843,21 @@ public final class MessageDataGenerator {
         }
     }
 
+    private void maybeGenerateNonIgnorableFieldCheck(FieldSpec field, VersionConditional
cond) {
+        if (!field.ignorable()) {
+            cond.ifNotMember(__ -> {
+                generateNonDefaultValueCheck(field, field.nullableVersions());
+                buffer.incrementIndent();
+                headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
+                buffer.printf("throw new UnsupportedVersionException(" +
+                                "\"Attempted to write a non-default %s at version \" + _version);%n",
+                        field.camelCaseName());
+                buffer.decrementIndent();
+                buffer.printf("}%n");
+            });
+        }
+    }
+
     private void generateClassWriter(String className, StructSpec struct,
             Versions parentVersions) {
         headerGenerator.addImport(MessageGenerator.WRITABLE_CLASS);
@@ -908,18 +923,8 @@ public final class MessageDataGenerator {
                         }).
                         generate(buffer);
                 });
-            if (!field.ignorable()) {
-                cond.ifNotMember(__ -> {
-                    generateNonDefaultValueCheck(field, field.nullableVersions());
-                    buffer.incrementIndent();
-                    headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
-                    buffer.printf("throw new UnsupportedVersionException(" +
-                            "\"Attempted to write a non-default %s at version \" + _version);%n",
-                        field.camelCaseName());
-                    buffer.decrementIndent();
-                    buffer.printf("}%n");
-                });
-            }
+
+            maybeGenerateNonIgnorableFieldCheck(field, cond);
             cond.generate(buffer);
         }
         headerGenerator.addImport(MessageGenerator.RAW_TAGGED_FIELD_WRITER_CLASS);
@@ -1216,7 +1221,7 @@ public final class MessageDataGenerator {
             generate(buffer);
         buffer.printf("Struct struct = new Struct(SCHEMAS[_version]);%n");
         for (FieldSpec field : struct.fields()) {
-            VersionConditional.forVersions(field.versions(), curVersions).
+            VersionConditional cond = VersionConditional.forVersions(field.versions(), curVersions).
                 alwaysEmitBlockScope(field.type().isArray()).
                 ifMember(presentVersions -> {
                     VersionConditional.forVersions(field.taggedVersions(), presentVersions).
@@ -1231,8 +1236,10 @@ public final class MessageDataGenerator {
                             buffer.printf("}%n");
                         }).
                         generate(buffer);
-                }).
-                generate(buffer);
+                });
+
+            maybeGenerateNonIgnorableFieldCheck(field, cond);
+            cond.generate(buffer);
         }
         VersionConditional.forVersions(messageFlexibleVersions, curVersions).
             ifMember(__ -> {


Mime
View raw message