kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: MINOR: Protocol schema refactor follow-up
Date Tue, 19 Sep 2017 18:07:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5e0bb3df9 -> 92c06cbad


MINOR: Protocol schema refactor follow-up

- Use constants in a few places that were missed
- Remove ProtoUtils by moving its methods to Schema
- Merge SchemaVisitor and SchemaVisitorAdapter
- Change SchemaVisitor package.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3895 from ijuma/protocol-schema-refactor-follow-ups


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/92c06cba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/92c06cba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/92c06cba

Branch: refs/heads/trunk
Commit: 92c06cbad591b88e69f416180052a72af455fa30
Parents: 5e0bb3d
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Sep 19 11:07:32 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Tue Sep 19 11:07:32 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/ApiKeys.java   | 16 +++----
 .../kafka/common/protocol/ProtoUtils.java       | 47 --------------------
 .../kafka/common/protocol/SchemaVisitor.java    | 27 -----------
 .../common/protocol/SchemaVisitorAdapter.java   | 38 ----------------
 .../kafka/common/protocol/types/Schema.java     | 31 +++++++++++++
 .../requests/AlterReplicaDirResponse.java       |  6 +--
 .../common/requests/ApiVersionsResponse.java    |  2 +-
 .../requests/ControlledShutdownResponse.java    |  2 +-
 .../common/requests/DeleteRecordsResponse.java  |  4 +-
 .../common/requests/DescribeLogDirsRequest.java |  4 +-
 .../requests/DescribeLogDirsResponse.java       | 16 +++----
 .../kafka/common/requests/FetchRequest.java     |  2 +-
 .../kafka/common/requests/FetchResponse.java    |  4 +-
 .../common/requests/LeaderAndIsrRequest.java    |  3 +-
 .../common/requests/ListGroupsResponse.java     |  8 ++--
 .../common/requests/OffsetCommitResponse.java   |  1 -
 .../requests/OffsetsForLeaderEpochRequest.java  |  2 +-
 .../kafka/common/requests/RequestHeader.java    |  1 -
 .../common/requests/UpdateMetadataRequest.java  |  3 +-
 .../common/requests/WriteTxnMarkersRequest.java | 11 ++---
 .../requests/WriteTxnMarkersResponse.java       | 13 +++---
 21 files changed, 81 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 0e087eb..62dce79 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -97,7 +97,7 @@ import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
 import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.kafka.common.protocol.types.Type.BYTES;
 import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
@@ -325,18 +325,16 @@ public enum ApiKeys {
     }
 
     private static boolean retainsBufferReference(Schema schema) {
-        final AtomicReference<Boolean> foundBufferReference = new AtomicReference<>(Boolean.FALSE);
-        SchemaVisitor detector = new SchemaVisitorAdapter() {
+        final AtomicBoolean hasBuffer = new AtomicBoolean(false);
+        Schema.Visitor detector = new Schema.Visitor() {
             @Override
             public void visit(Type field) {
-                if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS) {
-                    foundBufferReference.set(Boolean.TRUE);
-                }
+                if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS)
+                    hasBuffer.set(true);
             }
         };
-        foundBufferReference.set(Boolean.FALSE);
-        ProtoUtils.walk(schema, detector);
-        return foundBufferReference.get();
+        schema.walk(detector);
+        return hasBuffer.get();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
deleted file mode 100644
index f9be12c..0000000
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.protocol;
-
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.BoundField;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Type;
-
-public class ProtoUtils {
-    public static void walk(Schema schema, SchemaVisitor visitor) {
-        if (schema == null || visitor == null) {
-            throw new IllegalArgumentException("Both schema and visitor must be provided");
-        }
-        handleNode(schema, visitor);
-    }
-
-    private static void handleNode(Type node, SchemaVisitor visitor) {
-        if (node instanceof Schema) {
-            Schema schema = (Schema) node;
-            visitor.visit(schema);
-            for (BoundField f : schema.fields()) {
-                handleNode(f.def.type, visitor);
-            }
-        } else if (node instanceof ArrayOf) {
-            ArrayOf array = (ArrayOf) node;
-            visitor.visit(array);
-            handleNode(array.type(), visitor);
-        } else {
-            visitor.visit(node);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
deleted file mode 100644
index e61cc77..0000000
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.protocol;
-
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Type;
-
-public interface SchemaVisitor {
-    void visit(Schema schema);
-    void visit(ArrayOf array);
-    void visit(Type field);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
deleted file mode 100644
index 62834d0..0000000
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.protocol;
-
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Type;
-
-public abstract class SchemaVisitorAdapter implements SchemaVisitor {
-    @Override
-    public void visit(Schema schema) {
-        //nop
-    }
-
-    @Override
-    public void visit(ArrayOf array) {
-        //nop
-    }
-
-    @Override
-    public void visit(Type field) {
-        //nop
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index 187e14b..faa1540 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.protocol.types;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * The schema for a compound record definition
@@ -164,4 +165,34 @@ public class Schema extends Type {
         }
     }
 
+    public void walk(Visitor visitor) {
+        Objects.requireNonNull(visitor, "visitor must be non-null");
+        handleNode(this, visitor);
+    }
+
+    private static void handleNode(Type node, Visitor visitor) {
+        if (node instanceof Schema) {
+            Schema schema = (Schema) node;
+            visitor.visit(schema);
+            for (BoundField f : schema.fields())
+                handleNode(f.def.type, visitor);
+        } else if (node instanceof ArrayOf) {
+            ArrayOf array = (ArrayOf) node;
+            visitor.visit(array);
+            handleNode(array.type(), visitor);
+        } else {
+            visitor.visit(node);
+        }
+    }
+
+    /**
+     * Override one or more of the visit methods with the desired logic.
+     */
+    public static abstract class Visitor {
+        public void visit(Schema schema) {}
+        public void visit(ArrayOf array) {}
+        public void visit(Type field) {}
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
index ed00b75..1767d45 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
@@ -48,9 +48,9 @@ public class AlterReplicaDirResponse extends AbstractResponse {
 
     private static final Schema ALTER_REPLICA_DIR_RESPONSE_V0 = new Schema(
             THROTTLE_TIME_MS,
-            new Field("topics", new ArrayOf(new Schema(
+            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
                     TOPIC_NAME,
-                    new Field("partitions", new ArrayOf(new Schema(
+                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema(
                             PARTITION_ID,
                             ERROR_CODE)))))));
 
@@ -127,4 +127,4 @@ public class AlterReplicaDirResponse extends AbstractResponse {
     public static AlterReplicaDirResponse parse(ByteBuffer buffer, short version) {
         return new AlterReplicaDirResponse(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version).read(buffer));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 6a0418f..2bdc8aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -165,7 +165,7 @@ public class ApiVersionsResponse extends AbstractResponse {
 
     private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions)
{
         Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>();
-        for (ApiVersion apiVersion: apiVersions) {
+        for (ApiVersion apiVersion : apiVersions) {
             tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion);
         }
         return tempApiIdToApiVersion;

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index e0b3860..dfd68e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -44,7 +44,7 @@ public class ControlledShutdownResponse extends AbstractResponse {
 
     private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema(
             ERROR_CODE,
-            new Field("partitions_remaining", new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0),
"The partitions " +
+            new Field(PARTITIONS_REMAINING_KEY_NAME, new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0),
"The partitions " +
                     "that the broker still leads."));
 
     private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index aeea1cd..5bfdec8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -62,7 +62,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
 
     private static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(
             THROTTLE_TIME_MS,
-            new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
+            new Field(TOPICS_KEY_NAME, new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
 
     public static Schema[] schemaVersions() {
         return new Schema[]{DELETE_RECORDS_RESPONSE_V0};
@@ -164,4 +164,4 @@ public class DeleteRecordsResponse extends AbstractResponse {
     public static DeleteRecordsResponse parse(ByteBuffer buffer, short version) {
         return new DeleteRecordsResponse(ApiKeys.DELETE_RECORDS.responseSchema(version).read(buffer));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
index 0169da5..5f35c43 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
@@ -45,9 +45,9 @@ public class DescribeLogDirsRequest extends AbstractRequest {
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
     private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V0 = new Schema(
-            new Field("topics", ArrayOf.nullable(new Schema(
+            new Field(TOPICS_KEY_NAME, ArrayOf.nullable(new Schema(
                     TOPIC_NAME,
-                    new Field("partitions", new ArrayOf(INT32), "List of partition ids of
the topic.")))));
+                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32), "List of partition
ids of the topic.")))));
 
     public static Schema[] schemaVersions() {
         return new Schema[]{DESCRIBE_LOG_DIRS_REQUEST_V0};

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index e35056e..dc226d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -62,18 +62,18 @@ public class DescribeLogDirsResponse extends AbstractResponse {
 
     private static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V0 = new Schema(
             THROTTLE_TIME_MS,
-            new Field("log_dirs", new ArrayOf(new Schema(
+            new Field(LOG_DIRS_KEY_NAME, new ArrayOf(new Schema(
                     ERROR_CODE,
-                    new Field("log_dir", STRING, "The absolute log directory path."),
-                    new Field("topics", new ArrayOf(new Schema(
+                    new Field(LOG_DIR_KEY_NAME, STRING, "The absolute log directory path."),
+                    new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
                             TOPIC_NAME,
-                            new Field("partitions", new ArrayOf(new Schema(
+                            new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema(
                                     PARTITION_ID,
-                                    new Field("size", INT64, "The size of the log segments
of the partition in bytes."),
-                                    new Field("offset_lag", INT64, "The lag of the log's
LEO w.r.t. partition's HW " +
+                                    new Field(SIZE_KEY_NAME, INT64, "The size of the log
segments of the partition in bytes."),
+                                    new Field(OFFSET_LAG_KEY_NAME, INT64, "The lag of the
log's LEO w.r.t. partition's HW " +
                                             "(if it is the current log for the partition)
or current replica's LEO " +
                                             "(if it is the future log for the partition)"),
-                                    new Field("is_future", BOOLEAN, "True if this log is
created by " +
+                                    new Field(IS_FUTURE_KEY_NAME, BOOLEAN, "True if this
log is created by " +
                                             "AlterReplicaDirRequest and will replace the
current log of the replica " +
                                             "in the future.")))))))))));
 
@@ -211,4 +211,4 @@ public class DescribeLogDirsResponse extends AbstractResponse {
             return builder.toString();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 4a60c94..3fea26c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -60,7 +60,7 @@ public class FetchRequest extends AbstractRequest {
             new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."),
             new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to fetch."));
 
-    // FETCH_REQUEST_PARTITION_V1 added log_start_offset field - the earliest available offset
of partition data that can be consumed.
+    // FETCH_REQUEST_PARTITION_V5 added log_start_offset field - the earliest available offset
of partition data that can be consumed.
     private static final Schema FETCH_REQUEST_PARTITION_V5 = new Schema(
             PARTITION_ID,
             new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."),

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 417e845..f8d3090 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -84,8 +84,10 @@ public class FetchResponse extends AbstractResponse {
             new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
     // Even though fetch response v2 has the same protocol as v1, the record set in the response
is different. In v1,
     // record set only includes messages of v0 (magic byte 0). In v2, record set can include
messages of v0 and v1
-    // (magic byte 0 and 1). For details, see ByteBufferMessageSet.
+    // (magic byte 0 and 1). For details, see Records, RecordBatch and Record.
     private static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
+
+    // The partition ordering is now relevant - partitions will be processed in order they
appear in request.
     private static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
 
     // The v4 Fetch Response adds features for transactional consumption (the aborted transaction
list and the

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 73f037f..27aaf0a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -93,7 +93,8 @@ public class LeaderAndIsrRequest extends AbstractRequest {
             new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)),
             new Field(LIVE_LEADERS_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));
 
-    // LEADER_AND_ISR_REQUEST_V1 added a per-partition is_new Field. This field specifies
whether the replica should have existed on the broker or not.
+    // LEADER_AND_ISR_REQUEST_V1 added a per-partition is_new Field. This field specifies
whether the replica should
+    // have existed on the broker or not.
     private static final Schema LEADER_AND_ISR_REQUEST_V1 = new Schema(
             new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."),
             new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."),

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index cdf4c59..8f48f39 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -38,15 +38,15 @@ public class ListGroupsResponse extends AbstractResponse {
     private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
 
     private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(
-            new Field("group_id", STRING),
-            new Field("protocol_type", STRING));
+            new Field(GROUP_ID_KEY_NAME, STRING),
+            new Field(PROTOCOL_TYPE_KEY_NAME, STRING));
     private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(
             ERROR_CODE,
-            new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
+            new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
     private static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema(
             THROTTLE_TIME_MS,
             ERROR_CODE,
-            new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
+            new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
 
     public static Schema[] schemaVersions() {
         return new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 0181eef..13484ed 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -85,7 +85,6 @@ public class OffsetCommitResponse extends AbstractResponse {
             OFFSET_COMMIT_RESPONSE_V3};
     }
 
-
     private final Map<TopicPartition, Errors> responseData;
     private final int throttleTimeMs;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index b5fce78..d0585be 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -43,7 +43,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     /* Offsets for Leader Epoch api */
     private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0 = new Schema(
             PARTITION_ID,
-            new Field("leader_epoch", INT32, "The epoch"));
+            new Field(LEADER_EPOCH, INT32, "The epoch"));
     private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0 = new Schema(
             TOPIC_NAME,
             new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 1284e7e..956d813 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -157,7 +157,6 @@ public class RequestHeader extends AbstractRequestResponse {
         return result;
     }
 
-
     private static Schema schema(short apiKey, short version) {
         if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN.id && version == 0)
             // This will be removed once we remove support for v0 of ControlledShutdownRequest,
which

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 67ae8e1..6c36bda 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -126,7 +126,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
 
     private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V3 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V2;
 
-    // UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 added a per-partition offline_replicas
field. This field specifies the list of replicas that are offline.
+    // UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 added a per-partition offline_replicas
field. This field specifies
+    // the list of replicas that are offline.
     private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 = new Schema(
             TOPIC_NAME,
             PARTITION_ID,

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 96dfb2f..3f7a0c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -40,7 +40,7 @@ import static org.apache.kafka.common.protocol.types.Type.INT64;
 
 public class WriteTxnMarkersRequest extends AbstractRequest {
     private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
-    private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
+    private static final String TXN_MARKERS_KEY_NAME = "transaction_markers";
 
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
@@ -60,7 +60,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
                     "hosted by this transaction coordinator"));
 
     private static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema(
-            new Field(TXN_MARKER_ENTRY_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0),
"The transaction markers to be written."));
+            new Field(TXN_MARKERS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "The
transaction markers to " +
+                    "be written."));
 
     public static Schema[] schemaVersions() {
         return new Schema[]{WRITE_TXN_MARKERS_REQUEST_V0};
@@ -160,7 +161,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
     public WriteTxnMarkersRequest(Struct struct, short version) {
         super(version);
         List<TxnMarkerEntry> markers = new ArrayList<>();
-        Object[] markersArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
+        Object[] markersArray = struct.getArray(TXN_MARKERS_KEY_NAME);
         for (Object markerObj : markersArray) {
             Struct markerStruct = (Struct) markerObj;
 
@@ -197,7 +198,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
         Object[] markersArray = new Object[markers.size()];
         int i = 0;
         for (TxnMarkerEntry entry : markers) {
-            Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
+            Struct markerStruct = struct.instance(TXN_MARKERS_KEY_NAME);
             markerStruct.set(PRODUCER_ID_KEY_NAME, entry.producerId);
             markerStruct.set(PRODUCER_EPOCH_KEY_NAME, entry.producerEpoch);
             markerStruct.set(COORDINATOR_EPOCH_KEY_NAME, entry.coordinatorEpoch);
@@ -215,7 +216,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
             markerStruct.set(TOPICS_KEY_NAME, partitionsArray);
             markersArray[i++] = markerStruct;
         }
-        struct.set(TXN_MARKER_ENTRY_KEY_NAME, markersArray);
+        struct.set(TXN_MARKERS_KEY_NAME, markersArray);
 
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 3372670..797fb59 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -35,7 +35,7 @@ import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
 import static org.apache.kafka.common.protocol.types.Type.INT64;
 
 public class WriteTxnMarkersResponse extends AbstractResponse {
-    private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
+    private static final String TXN_MARKERS_KEY_NAME = "transaction_markers";
 
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String TOPICS_KEY_NAME = "topics";
@@ -45,7 +45,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
             PARTITION_ID,
             ERROR_CODE);
 
-    private static final Schema WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0 = new Schema(
+    private static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
             new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional
id."),
             new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
                     TOPIC_NAME,
@@ -53,7 +53,8 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
                     "Errors per partition from writing markers."));
 
     private static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema(
-            new Field("transaction_markers", new ArrayOf(WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0),
"Errors per partition from writing markers."));
+            new Field(TXN_MARKERS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "Errors
per partition from " +
+                    "writing markers."));
 
     public static Schema[] schemaVersions() {
         return new Schema[]{WRITE_TXN_MARKERS_RESPONSE_V0};
@@ -82,7 +83,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
     public WriteTxnMarkersResponse(Struct struct) {
         Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>();
 
-        Object[] responseArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
+        Object[] responseArray = struct.getArray(TXN_MARKERS_KEY_NAME);
         for (Object responseObj : responseArray) {
             Struct responseStruct = (Struct) responseObj;
 
@@ -113,7 +114,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
         Object[] responsesArray = new Object[errors.size()];
         int k = 0;
         for (Map.Entry<Long, Map<TopicPartition, Errors>> responseEntry : errors.entrySet())
{
-            Struct responseStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
+            Struct responseStruct = struct.instance(TXN_MARKERS_KEY_NAME);
             responseStruct.set(PRODUCER_ID_KEY_NAME, responseEntry.getKey());
 
             Map<TopicPartition, Errors> partitionAndErrors = responseEntry.getValue();
@@ -141,7 +142,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
             responsesArray[k++] = responseStruct;
         }
 
-        struct.set(TXN_MARKER_ENTRY_KEY_NAME, responsesArray);
+        struct.set(TXN_MARKERS_KEY_NAME, responsesArray);
         return struct;
     }
 


Mime
View raw message