kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8560; The Kafka protocol generator should support common structures
Date Tue, 02 Jul 2019 16:41:19 GMT
This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 711c817  KAFKA-8560; The Kafka protocol generator should support common structures
711c817 is described below

commit 711c817254eaf62d91b0d783fdd04c5f1915fa75
Author: Colin P. Mccabe <cmccabe@confluent.io>
AuthorDate: Tue Jul 2 09:40:54 2019 -0700

    KAFKA-8560; The Kafka protocol generator should support common structures
    
    Author: Colin P. Mccabe <cmccabe@confluent.io>
    
    Reviewers: Gwen Shapira
    
    Closes #6966 from cmccabe/KAFKA-8560
---
 .../common/message/LeaderAndIsrRequest.json        |  67 ++++------
 .../common/message/UpdateMetadataRequest.json      |  68 ++++------
 .../java/org/apache/kafka/message/FieldSpec.java   |   7 -
 .../java/org/apache/kafka/message/FieldType.java   |   4 +
 .../apache/kafka/message/MessageDataGenerator.java |  31 +++--
 .../java/org/apache/kafka/message/MessageSpec.java |  14 +-
 .../org/apache/kafka/message/SchemaGenerator.java  |  20 ++-
 .../org/apache/kafka/message/StructRegistry.java   | 144 +++++++++++++++++++++
 .../apache/kafka/message/StructRegistryTest.java   | 124 ++++++++++++++++++
 9 files changed, 379 insertions(+), 100 deletions(-)

diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index dfbf6b5..a449b86 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -28,59 +28,48 @@
       "about": "The current controller epoch." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default":
"-1",
       "about": "The current broker epoch." },
+    { "name": "PartitionStatesV0", "type": "[]LeaderAndIsrRequestPartition", "versions":
"0-1",
+      "about": "The state of each partition, in a v0 or v1 message." },
+    // In v0 or v1 requests, each partition is listed alongside its topic name.
+    // In v2+ requests, partitions are organized by topic, so that each topic name
+    // only needs to be listed once.
     { "name": "TopicStates", "type": "[]LeaderAndIsrRequestTopicState", "versions": "2+",
       "about": "Each topic.", "fields": [
       { "name": "Name", "type": "string", "versions": "2+", "entityType": "topicName",
         "about": "The topic name." },
-      { "name": "PartitionStates", "type": "[]LeaderAndIsrRequestPartitionState", "versions":
"0+",
-        "about": "The state of each partition", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
-          "about": "The partition index." },
-        { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
-          "about": "The controller epoch." },
-        { "name": "LeaderKey", "type": "int32", "versions": "0+", "entityType": "brokerId",
-          "about": "The broker ID of the leader." },
-        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
-          "about": "The leader epoch." },
-        { "name": "IsrReplicas", "type": "[]int32", "versions": "0+",
-          "about": "The in-sync replica IDs." },
-        { "name": "ZkVersion", "type": "int32", "versions": "0+",
-          "about": "The ZooKeeper version." },
-        { "name": "Replicas", "type": "[]int32", "versions": "0+",
-          "about": "The replica IDs." },
-        { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable":
true, 
-          "about": "Whether the replica should have existed on the broker or not." }
-      ]}
+      { "name": "PartitionStatesV0", "type": "[]LeaderAndIsrRequestPartition", "versions":
"2+",
+        "about": "The state of each partition" }
     ]},
-    { "name": "PartitionStatesV0", "type": "[]LeaderAndIsrRequestPartitionStateV0", "versions":
"0-1",
-      "about": "The state of each partition", "fields": [
+    { "name": "LiveLeaders", "type": "[]LeaderAndIsrLiveLeader", "versions": "0+",
+      "about": "The current live leaders.", "fields": [
+      { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
+        "about": "The leader's broker ID." },
+      { "name": "HostName", "type": "string", "versions": "0+",
+        "about": "The leader's hostname." },
+      { "name": "Port", "type": "int32", "versions": "0+",
+        "about": "The leader's port." }
+    ]}
+  ],
+  "commonStructs": [
+    { "name": "LeaderAndIsrRequestPartition", "versions": "0+", "fields": [
       { "name": "TopicName", "type": "string", "versions": "0-1", "entityType": "topicName",
-        "about": "The topic name." },
-      { "name": "PartitionIndex", "type": "int32", "versions": "0-1",
+        "about": "The topic name.  This is only present in v0 or v1." },
+      { "name": "PartitionIndex", "type": "int32", "versions": "0+",
         "about": "The partition index." },
-      { "name": "ControllerEpoch", "type": "int32", "versions": "0-1",
+      { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
         "about": "The controller epoch." },
-      { "name": "LeaderKey", "type": "int32", "versions": "0-1", "entityType": "brokerId",
+      { "name": "LeaderKey", "type": "int32", "versions": "0+", "entityType": "brokerId",
         "about": "The broker ID of the leader." },
-      { "name": "LeaderEpoch", "type": "int32", "versions": "0-1",
+      { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
         "about": "The leader epoch." },
-      { "name": "IsrReplicas", "type": "[]int32", "versions": "0-1",
+      { "name": "IsrReplicas", "type": "[]int32", "versions": "0+",
         "about": "The in-sync replica IDs." },
-      { "name": "ZkVersion", "type": "int32", "versions": "0-1",
+      { "name": "ZkVersion", "type": "int32", "versions": "0+",
         "about": "The ZooKeeper version." },
-      { "name": "Replicas", "type": "[]int32", "versions": "0-1",
+      { "name": "Replicas", "type": "[]int32", "versions": "0+",
         "about": "The replica IDs." },
-      { "name": "IsNew", "type": "bool", "versions": "1", "default": "false", "ignorable":
true, 
+      { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable":
true,
         "about": "Whether the replica should have existed on the broker or not." }
-    ]},
-    { "name": "LiveLeaders", "type": "[]LeaderAndIsrLiveLeader", "versions": "0+",
-      "about": "The current live leaders.", "fields": [
-      { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
-        "about": "The leader's broker ID." },
-      { "name": "HostName", "type": "string", "versions": "0+",
-        "about": "The leader's hostname." },
-      { "name": "Port", "type": "int32", "versions": "0+",
-        "about": "The leader's port." }
     ]}
   ]
 }
diff --git a/clients/src/main/resources/common/message/UpdateMetadataRequest.json b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
index f23e2de..d0873d1 100644
--- a/clients/src/main/resources/common/message/UpdateMetadataRequest.json
+++ b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
@@ -34,50 +34,14 @@
       "about": "The controller epoch." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "5+", "ignorable": true, "default":
"-1",
       "about": "The broker epoch." },
+    { "name": "LegacyPartitionStates", "type": "[]UpdateMetadataPartitionState", "versions":
"0-4",
+      "about": "In older versions of this RPC, each partition that we would like to update."
},
     { "name": "TopicStates", "type": "[]UpdateMetadataRequestTopicState", "versions": "5+",
-      "about": "Each topic that we would like to update.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
+      "about": "In newer versions of this RPC, each topic that we would like to update.",
"fields": [
+      { "name": "TopicName", "type": "string", "versions": "5+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "PartitionStates", "type": "[]UpdateMetadataPartitionState", "versions":
"5+",
-        "about": "The partition that we would like to update.", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "5+",
-          "about": "The partition index." },
-        { "name": "ControllerEpoch", "type": "int32", "versions": "5+",
-          "about": "The controller epoch." },
-        { "name": "Leader", "type": "int32", "versions": "5+", "entityType": "brokerId",
-          "about": "The ID of the broker which is the current partition leader." },
-        { "name": "LeaderEpoch", "type": "int32", "versions": "5+",
-          "about": "The leader epoch of this partition." },
-        { "name": "Isr", "type": "[]int32", "versions": "5+", "entityType": "brokerId",
-          "about": "The brokers which are in the ISR for this partition." },
-        { "name": "ZkVersion", "type": "int32", "versions": "5+",
-          "about": "The Zookeeper version." },
-        { "name": "Replicas", "type": "[]int32", "versions": "5+", "entityType": "brokerId",
-          "about": "All the replicas of this partition." },
-        { "name": "OfflineReplicas", "type": "[]int32", "versions": "5+",
-          "about": "The replicas of this partition which are offline." }
-      ]}
-    ]},
-    { "name": "PartitionStatesV0", "type": "[]UpdateMetadataRequestPartitionStateV0", "versions":
"0-4",
-      "about": "Each partition that we would like to update.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0-4", "entityType": "topicName",
-        "about": "The topic name." },
-      { "name": "PartitionIndex", "type": "int32", "versions": "0-4",
-        "about": "The partition index." },
-      { "name": "ControllerEpoch", "type": "int32", "versions": "0-4",
-        "about": "The controller epoch." },
-      { "name": "Leader", "type": "int32", "versions": "0-4", "entityType": "brokerId",
-        "about": "The ID of the broker which is the current partition leader." },
-      { "name": "LeaderEpoch", "type": "int32", "versions": "0-4",
-        "about": "The leader epoch of this partition." },
-      { "name": "Isr", "type": "[]int32", "versions": "0-4", "entityType": "brokerId",
-        "about": "The brokers which are in the ISR for this partition." },
-      { "name": "ZkVersion", "type": "int32", "versions": "0-4",
-        "about": "The Zookeeper version." },
-      { "name": "Replicas", "type": "[]int32", "versions": "0-4", "entityType": "brokerId",
-        "about": "All the replicas of this partition." },
-      { "name": "OfflineReplicas", "type": "[]int32", "versions": "4", "entityType": "brokerId",
-        "about": "The replicas of this partition which are offline." }
+        "about": "The partition that we would like to update." }
     ]},
     { "name": "Brokers", "type": "[]UpdateMetadataRequestBroker", "versions": "0+", "fields":
[
         { "name": "Id", "type": "int32", "versions": "0+", "entityType": "brokerId",
@@ -102,5 +66,27 @@
         { "name": "Rack", "type": "string", "versions": "2+", "nullableVersions": "0+", "ignorable":
true,
           "about": "The rack which this broker belongs to." }
     ]}
+  ],
+  "commonStructs": [
+    { "name": "UpdateMetadataPartitionState", "versions": "0+", "fields": [
+      { "name": "TopicName", "type": "string", "versions": "0-4", "entityType": "topicName",
+        "about": "In older versions of this RPC, the topic name." },
+      { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+        "about": "The partition index." },
+      { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
+        "about": "The controller epoch." },
+      { "name": "Leader", "type": "int32", "versions": "0+", "entityType": "brokerId",
+        "about": "The ID of the broker which is the current partition leader." },
+      { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
+        "about": "The leader epoch of this partition." },
+      { "name": "Isr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
+        "about": "The brokers which are in the ISR for this partition." },
+      { "name": "ZkVersion", "type": "int32", "versions": "0+",
+        "about": "The Zookeeper version." },
+      { "name": "Replicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
+        "about": "All the replicas of this partition." },
+      { "name": "OfflineReplicas", "type": "[]int32", "versions": "4+", "entityType": "brokerId",
+        "about": "The replicas of this partition which are offline." }
+    ]}
   ]
 }
diff --git a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
index 3652aba..7e5b37c 100644
--- a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
+++ b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
@@ -86,13 +86,6 @@ public final class FieldSpec {
         }
     }
 
-    public StructSpec toStruct() {
-        if ((!this.type.isArray()) && (this.type.isStruct())) {
-            throw new RuntimeException("Field " + name + " cannot be treated as a structure.");
-        }
-        return new StructSpec(name, versions.toString(), fields);
-    }
-
     @JsonProperty("name")
     public String name() {
         return name;
diff --git a/generator/src/main/java/org/apache/kafka/message/FieldType.java b/generator/src/main/java/org/apache/kafka/message/FieldType.java
index c920b8a..d98b8f3 100644
--- a/generator/src/main/java/org/apache/kafka/message/FieldType.java
+++ b/generator/src/main/java/org/apache/kafka/message/FieldType.java
@@ -181,6 +181,10 @@ public interface FieldType {
             return elementType;
         }
 
+        public String elementName() {
+            return elementType.toString();
+        }
+
         @Override
         public String toString() {
             return "[]" + elementType.toString();
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index ad00137..ee0c3df 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.message;
 
 import java.io.Writer;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -27,13 +28,15 @@ import java.util.stream.Collectors;
  * Generates Kafka MessageData classes.
  */
 public final class MessageDataGenerator {
+    private final StructRegistry structRegistry;
     private final HeaderGenerator headerGenerator;
     private final SchemaGenerator schemaGenerator;
     private final CodeBuffer buffer;
 
     MessageDataGenerator() {
+        this.structRegistry = new StructRegistry();
         this.headerGenerator = new HeaderGenerator();
-        this.schemaGenerator = new SchemaGenerator(headerGenerator);
+        this.schemaGenerator = new SchemaGenerator(headerGenerator, structRegistry);
         this.buffer = new CodeBuffer();
     }
 
@@ -42,6 +45,7 @@ public final class MessageDataGenerator {
             throw new RuntimeException("Message " + message.name() + " does " +
                 "not specify a maximum version.");
         }
+        structRegistry.register(message);
         schemaGenerator.generateSchemas(message);
         generateClass(Optional.of(message),
             message.name() + "Data",
@@ -104,6 +108,13 @@ public final class MessageDataGenerator {
         }
         generateSubclasses(className, struct, parentVersions, isSetElement);
         if (isTopLevel) {
+            for (Iterator<StructSpec> iter = structRegistry.commonStructs(); iter.hasNext();
) {
+                StructSpec commonStruct = iter.next();
+                generateClass(Optional.empty(),
+                        commonStruct.name(),
+                        commonStruct,
+                        commonStruct.versions());
+            }
             buffer.decrementIndent();
             buffer.printf("}%n");
         }
@@ -139,10 +150,12 @@ public final class MessageDataGenerator {
         for (FieldSpec field : struct.fields()) {
             if (field.type().isStructArray()) {
                 FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
-                generateClass(Optional.empty(),
-                    arrayType.elementType().toString(),
-                    field.toStruct(),
-                    parentVersions.intersect(struct.versions()));
+                if (!structRegistry.commonStructNames().contains(arrayType.elementName()))
{
+                    generateClass(Optional.empty(),
+                            arrayType.elementType().toString(),
+                            structRegistry.findStruct(field),
+                            parentVersions.intersect(struct.versions()));
+                }
             }
         }
         if (isSetElement) {
@@ -306,7 +319,7 @@ public final class MessageDataGenerator {
             return MessageGenerator.capitalizeFirst(field.typeString());
         } else if (field.type().isArray()) {
             FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
-            if (field.toStruct().hasKeys()) {
+            if (structRegistry.isStructArrayWithKeys(field)) {
                 headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
                 return collectionType(arrayType.elementType().toString());
             } else {
@@ -321,7 +334,7 @@ public final class MessageDataGenerator {
     private String fieldConcreteJavaType(FieldSpec field) {
         if (field.type().isArray()) {
             FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
-            if (field.toStruct().hasKeys()) {
+            if (structRegistry.isStructArrayWithKeys(field)) {
                 headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
                 return collectionType(arrayType.elementType().toString());
             } else {
@@ -412,7 +425,7 @@ public final class MessageDataGenerator {
                 buffer.printf("{%n");
                 buffer.incrementIndent();
             }
-            boolean hasKeys = field.toStruct().hasKeys();
+            boolean hasKeys = structRegistry.isStructArrayWithKeys(field);
             buffer.printf("int arrayLength = readable.readInt();%n");
             buffer.printf("if (arrayLength < 0) {%n");
             buffer.incrementIndent();
@@ -1236,7 +1249,7 @@ public final class MessageDataGenerator {
                     field.name() + ": custom defaults are not supported for array fields.");
             }
             FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
-            if (field.toStruct().hasKeys()) {
+            if (structRegistry.isStructArrayWithKeys(field)) {
                 return "new " + collectionType(arrayType.elementType().toString()) + "(0)";
             } else {
                 headerGenerator.addImport(MessageGenerator.ARRAYLIST_CLASS);
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java
index 9e3eb38..30b1fe4 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java
@@ -20,6 +20,8 @@ package org.apache.kafka.message;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -31,15 +33,20 @@ public final class MessageSpec {
 
     private final MessageSpecType type;
 
+    private final List<StructSpec> commonStructs;
+
     @JsonCreator
     public MessageSpec(@JsonProperty("name") String name,
                        @JsonProperty("validVersions") String validVersions,
                        @JsonProperty("fields") List<FieldSpec> fields,
                        @JsonProperty("apiKey") Short apiKey,
-                       @JsonProperty("type") MessageSpecType type) {
+                       @JsonProperty("type") MessageSpecType type,
+                       @JsonProperty("commonStructs") List<StructSpec> commonStructs)
{
         this.struct = new StructSpec(name, validVersions, fields);
         this.apiKey = apiKey == null ? Optional.empty() : Optional.of(apiKey);
         this.type = Objects.requireNonNull(type);
+        this.commonStructs = commonStructs == null ? Collections.emptyList() :
+                Collections.unmodifiableList(new ArrayList<>(commonStructs));
     }
 
     public StructSpec struct() {
@@ -71,6 +78,11 @@ public final class MessageSpec {
         return type;
     }
 
+    @JsonProperty("commonStructs")
+    public List<StructSpec> commonStructs() {
+        return commonStructs;
+    }
+
     public String generatedClassName() {
         return struct.name() + "Data";
     }
diff --git a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
index e89599d..2ef3868 100644
--- a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.message;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -58,18 +59,31 @@ final class SchemaGenerator {
     private final HeaderGenerator headerGenerator;
 
     /**
+     * A registry with the structures we're generating.
+     */
+    private final StructRegistry structRegistry;
+
+    /**
      * Maps message names to message information.
      */
     private final Map<String, MessageInfo> messages;
 
-    SchemaGenerator(HeaderGenerator headerGenerator) {
+    SchemaGenerator(HeaderGenerator headerGenerator, StructRegistry structRegistry) {
         this.headerGenerator = headerGenerator;
+        this.structRegistry = structRegistry;
         this.messages = new HashMap<>();
     }
 
     void generateSchemas(MessageSpec message) throws Exception {
+        // Generate schemas for inline structures
         generateSchemas(message.generatedClassName(), message.struct(),
             message.struct().versions());
+
+        // Generate schemas for common structures
+        for (Iterator<StructSpec> iter = structRegistry.commonStructs(); iter.hasNext();
) {
+            StructSpec struct = iter.next();
+            generateSchemas(struct.name(), struct, struct.versions());
+        }
     }
 
     void generateSchemas(String className, StructSpec struct,
@@ -85,9 +99,9 @@ final class SchemaGenerator {
         for (FieldSpec field : struct.fields()) {
             if (field.type().isStructArray()) {
                 FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
-                generateSchemas(arrayType.elementType().toString(), field.toStruct(), versions);
+                generateSchemas(arrayType.elementType().toString(), structRegistry.findStruct(field),
versions);
             } else if (field.type().isStruct()) {
-                generateSchemas(field.type().toString(), field.toStruct(), versions);
+                generateSchemas(field.type().toString(), structRegistry.findStruct(field),
versions);
             }
         }
         CodeBuffer prev = null;
diff --git a/generator/src/main/java/org/apache/kafka/message/StructRegistry.java b/generator/src/main/java/org/apache/kafka/message/StructRegistry.java
new file mode 100644
index 0000000..97ffba5
--- /dev/null
+++ b/generator/src/main/java/org/apache/kafka/message/StructRegistry.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.message;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Contains structure data for Kafka MessageData classes.
+ */
+final class StructRegistry {
+    private final Map<String, StructSpec> structSpecs;
+    private final Set<String> commonStructNames;
+
+    StructRegistry() {
+        this.structSpecs = new TreeMap<>();
+        this.commonStructNames = new TreeSet<>();
+    }
+
+    /**
+     * Register all the structures contained a message spec.
+     */
+    void register(MessageSpec message) throws Exception {
+        // Register common structures.
+        for (StructSpec struct : message.commonStructs()) {
+            if (!MessageGenerator.firstIsCapitalized(struct.name())) {
+                throw new RuntimeException("Can't process structure " + struct.name() +
+                        ": the first letter of structure names must be capitalized.");
+            }
+            if (structSpecs.put(struct.name(), struct) != null) {
+                throw new RuntimeException("Common struct " + struct.name() + " was specified
twice.");
+            }
+            commonStructNames.add(struct.name());
+        }
+
+        // Register inline structures.
+        addStructSpecs(message.fields());
+    }
+
+    @SuppressWarnings("unchecked")
+    private void addStructSpecs(List<FieldSpec> fields) {
+        for (FieldSpec field : fields) {
+            if (field.type().isStructArray()) {
+                FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
+                if (commonStructNames.contains(arrayType.elementName())) {
+                    // If we're using a common structure, we can't specify its fields.
+                    // The fields should be specified in the commonStructs area.
+                    if (!field.fields().isEmpty()) {
+                        throw new RuntimeException("Can't re-specify the common struct "
+
+                                arrayType.elementName() + " as an inline struct.");
+                    }
+                } else if (structSpecs.put(arrayType.elementName(),
+                            new StructSpec(arrayType.elementName(),
+                                    field.versions().toString(),
+                                    field.fields())) != null) {
+                    // Inline structures should only appear once.
+                    throw new RuntimeException("Struct " + arrayType.elementName() +
+                            " was specified twice.");
+                }
+                addStructSpecs(field.fields());
+            }
+        }
+    }
+
+    /**
+     * Locate the struct corresponding to a field.
+     */
+    @SuppressWarnings("unchecked")
+    StructSpec findStruct(FieldSpec field) {
+        if ((!field.type().isArray()) && (field.type().isStruct())) {
+            throw new RuntimeException("Field " + field.name() +
+                    " cannot be treated as a structure.");
+        }
+        FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
+        StructSpec struct = structSpecs.get(arrayType.elementName());
+        if (struct == null) {
+            throw new RuntimeException("Unable to locate a specification for the structure
" +
+                    arrayType.elementName());
+        }
+        return struct;
+    }
+
+    /**
+     * Return true if the field is a struct array with keys.
+     */
+    @SuppressWarnings("unchecked")
+    boolean isStructArrayWithKeys(FieldSpec field) {
+        if (!field.type().isArray()) {
+            return false;
+        }
+        FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
+        if (!arrayType.isStructArray()) {
+            return false;
+        }
+        StructSpec struct = structSpecs.get(arrayType.elementName());
+        if (struct == null) {
+            throw new RuntimeException("Unable to locate a specification for the structure
" +
+                    arrayType.elementName());
+        }
+        return struct.hasKeys();
+    }
+
+    Set<String> commonStructNames() {
+        return commonStructNames;
+    }
+
+    /**
+     * Returns an iterator that will step through all the common structures.
+     */
+    Iterator<StructSpec> commonStructs() {
+        return new Iterator<StructSpec>() {
+            private final Iterator<String> iter = commonStructNames.iterator();
+
+            @Override
+            public boolean hasNext() {
+                return iter.hasNext();
+            }
+
+            @Override
+            public StructSpec next() {
+                return structSpecs.get(iter.next());
+            }
+        };
+    }
+}
diff --git a/generator/src/test/java/org/apache/kafka/message/StructRegistryTest.java b/generator/src/test/java/org/apache/kafka/message/StructRegistryTest.java
new file mode 100644
index 0000000..4ac16b4
--- /dev/null
+++ b/generator/src/test/java/org/apache/kafka/message/StructRegistryTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.message;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class StructRegistryTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testCommonStructs() throws Exception {
+        MessageSpec testMessageSpec = MessageGenerator.JSON_SERDE.readValue(String.join("",
Arrays.asList(
+                "{",
+                "  \"type\": \"request\",",
+                "  \"name\": \"LeaderAndIsrRequest\",",
+                "  \"validVersions\": \"0-2\",",
+                "  \"fields\": [",
+                "    { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" },",
+                "    { \"name\": \"field2\", \"type\": \"[]TestCommonStruct\", \"versions\":
\"1+\" },",
+                "    { \"name\": \"field3\", \"type\": \"[]TestInlineStruct\", \"versions\":
\"0+\", ",
+                "    \"fields\": [",
+                "      { \"name\": \"inlineField1\", \"type\": \"int64\", \"versions\": \"0+\"
}",
+                "    ]}",
+                "  ],",
+                "  \"commonStructs\": [",
+                "    { \"name\": \"TestCommonStruct\", \"versions\": \"0+\", \"fields\":
[",
+                "      { \"name\": \"commonField1\", \"type\": \"int64\", \"versions\": \"0+\"
}",
+                "    ]}",
+                "  ]",
+                "}")), MessageSpec.class);
+        StructRegistry structRegistry = new StructRegistry();
+        structRegistry.register(testMessageSpec);
+        assertEquals(Collections.singleton("TestCommonStruct"),
+                structRegistry.commonStructNames());
+        assertFalse(structRegistry.isStructArrayWithKeys(testMessageSpec.fields().get(1)));
+        assertFalse(structRegistry.isStructArrayWithKeys(testMessageSpec.fields().get(2)));
+        assertTrue(structRegistry.commonStructs().hasNext());
+        assertEquals("TestCommonStruct", structRegistry.commonStructs().next().name());
+    }
+
+    @Test
+    public void testReSpecifiedCommonStructError() throws Exception {
+        MessageSpec testMessageSpec = MessageGenerator.JSON_SERDE.readValue(String.join("",
Arrays.asList(
+                "{",
+                "  \"type\": \"request\",",
+                "  \"name\": \"LeaderAndIsrRequest\",",
+                "  \"validVersions\": \"0-2\",",
+                "  \"fields\": [",
+                "    { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" },",
+                "    { \"name\": \"field2\", \"type\": \"[]TestCommonStruct\", \"versions\":
\"0+\", ",
+                "    \"fields\": [",
+                "      { \"name\": \"inlineField1\", \"type\": \"int64\", \"versions\": \"0+\"
}",
+                "    ]}",
+                "  ],",
+                "  \"commonStructs\": [",
+                "    { \"name\": \"TestCommonStruct\", \"versions\": \"0+\", \"fields\":
[",
+                "      { \"name\": \"commonField1\", \"type\": \"int64\", \"versions\": \"0+\"
}",
+                "    ]}",
+                "  ]",
+                "}")), MessageSpec.class);
+        StructRegistry structRegistry = new StructRegistry();
+        try {
+            structRegistry.register(testMessageSpec);
+            fail("Expected StructRegistry#registry to fail");
+        } catch (RuntimeException e) {
+            assertTrue(e.getMessage().contains("Can't re-specify the common struct TestCommonStruct
" +
+                    "as an inline struct."));
+        }
+    }
+
+    @Test
+    public void testDuplicateCommonStructError() throws Exception {
+        MessageSpec testMessageSpec = MessageGenerator.JSON_SERDE.readValue(String.join("",
Arrays.asList(
+                "{",
+                "  \"type\": \"request\",",
+                "  \"name\": \"LeaderAndIsrRequest\",",
+                "  \"validVersions\": \"0-2\",",
+                "  \"fields\": [",
+                "    { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" }",
+                "  ],",
+                "  \"commonStructs\": [",
+                "    { \"name\": \"TestCommonStruct\", \"versions\": \"0+\", \"fields\":
[",
+                "      { \"name\": \"commonField1\", \"type\": \"int64\", \"versions\": \"0+\"
}",
+                "    ]},",
+                "    { \"name\": \"TestCommonStruct\", \"versions\": \"0+\", \"fields\":
[",
+                "      { \"name\": \"commonField1\", \"type\": \"int64\", \"versions\": \"0+\"
}",
+                "    ]}",
+                "  ]",
+                "}")), MessageSpec.class);
+        StructRegistry structRegistry = new StructRegistry();
+        try {
+            structRegistry.register(testMessageSpec);
+            fail("Expected StructRegistry#registry to fail");
+        } catch (RuntimeException e) {
+            assertTrue(e.getMessage().contains("Common struct TestCommonStruct was specified
twice."));
+        }
+    }
+}


Mime
View raw message