http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java new file mode 100644 index 0000000..b031b4f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol.types; + +/** + * A field definition bound to a particular schema. + */ +public class BoundField { + public final Field def; + final int index; + final Schema schema; + + public BoundField(Field def, Schema schema, int index) { + this.def = def; + this.schema = schema; + this.index = index; + } + + @Override + public String toString() { + return def.name + ":" + def.type; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java index 29a89d4..8da848b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java @@ -16,63 +16,67 @@ */ package org.apache.kafka.common.protocol.types; -/** - * A field in a schema - */ public class Field { - - public static final Object NO_DEFAULT = new Object(); - - final int index; public final String name; + public final String docString; public final Type type; + public final boolean hasDefaultValue; public final Object defaultValue; - public final String doc; - final Schema schema; - /** - * Create the field. - * - * @throws SchemaException If the default value is not primitive and the validation fails - */ - public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) { - this.index = index; + public Field(String name, Type type, String docString, boolean hasDefaultValue, Object defaultValue) { this.name = name; + this.docString = docString; this.type = type; - this.doc = doc; + this.hasDefaultValue = hasDefaultValue; this.defaultValue = defaultValue; - this.schema = schema; - if (defaultValue != NO_DEFAULT) + + if (hasDefaultValue) type.validate(defaultValue); } - public Field(int index, String name, Type type, String doc, Object defaultValue) { - this(index, name, type, doc, defaultValue, null); + public Field(String name, Type type, String docString) { + this(name, type, docString, false, null); } - public Field(String name, Type type, String doc, Object defaultValue) { - this(-1, name, type, doc, defaultValue); + public Field(String name, Type type, String docString, Object defaultValue) { + this(name, type, docString, true, defaultValue); } - public Field(String name, Type type, String doc) { - this(name, type, doc, NO_DEFAULT); + public Field(String name, Type type) { + this(name, type, null, false, null); } - public Field(String name, Type type) { - this(name, type, ""); + public static class Int8 extends Field { + public Int8(String name, String docString) { + super(name, Type.INT8, docString, false, null); + } } - public Type type() { - return type; + public static class Int32 extends Field { + public Int32(String name, String docString) { + super(name, Type.INT32, docString, false, null); + } + + public Int32(String name, String docString, int defaultValue) { + super(name, Type.INT32, docString, true, defaultValue); + } } - public Schema schema() { - return schema; + public static class Int16 extends Field { + public Int16(String name, String docString) { + super(name, Type.INT16, docString, false, null); + } } + public static class Str extends Field { + public Str(String name, String docString) { + super(name, Type.STRING, docString, false, null); + } + } - @Override - public String toString() { - return name + ":" + type; + public static class NullableStr extends Field { + public NullableStr(String name, String docString) { + super(name, Type.NULLABLE_STRING, docString, false, null); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index a9c08aa..187e14b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -25,8 +25,8 @@ import java.util.Map; */ public class Schema extends Type { - private final Field[] fields; - private final Map fieldsByName; + private final BoundField[] fields; + private final Map fieldsByName; /** * Construct the schema with a given list of its field values @@ -34,14 +34,14 @@ public class Schema extends Type { * @throws SchemaException If the given list have duplicate fields */ public Schema(Field... fs) { - this.fields = new Field[fs.length]; + this.fields = new BoundField[fs.length]; this.fieldsByName = new HashMap<>(); for (int i = 0; i < this.fields.length; i++) { - Field field = fs[i]; - if (fieldsByName.containsKey(field.name)) - throw new SchemaException("Schema contains a duplicate field: " + field.name); - this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this); - this.fieldsByName.put(fs[i].name, this.fields[i]); + Field def = fs[i]; + if (fieldsByName.containsKey(def.name)) + throw new SchemaException("Schema contains a duplicate field: " + def.name); + this.fields[i] = new BoundField(def, this, i); + this.fieldsByName.put(def.name, this.fields[i]); } } @@ -51,12 +51,12 @@ public class Schema extends Type { @Override public void write(ByteBuffer buffer, Object o) { Struct r = (Struct) o; - for (Field field : fields) { + for (BoundField field : fields) { try { - Object value = field.type().validate(r.get(field)); - field.type.write(buffer, value); + Object value = field.def.type.validate(r.get(field)); + field.def.type.write(buffer, value); } catch (Exception e) { - throw new SchemaException("Error writing field '" + field.name + "': " + + throw new SchemaException("Error writing field '" + field.def.name + "': " + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } @@ -70,9 +70,9 @@ public class Schema extends Type { Object[] objects = new Object[fields.length]; for (int i = 0; i < fields.length; i++) { try { - objects[i] = fields[i].type.read(buffer); + objects[i] = fields[i].def.type.read(buffer); } catch (Exception e) { - throw new SchemaException("Error reading field '" + fields[i].name + "': " + + throw new SchemaException("Error reading field '" + fields[i].def.name + "': " + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } @@ -86,11 +86,11 @@ public class Schema extends Type { public int sizeOf(Object o) { int size = 0; Struct r = (Struct) o; - for (Field field : fields) { + for (BoundField field : fields) { try { - size += field.type.sizeOf(r.get(field)); + size += field.def.type.sizeOf(r.get(field)); } catch (Exception e) { - throw new SchemaException("Error computing size for field '" + field.name + "': " + + throw new SchemaException("Error computing size for field '" + field.def.name + "': " + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } @@ -110,7 +110,7 @@ public class Schema extends Type { * @param slot The slot at which this field sits * @return The field */ - public Field get(int slot) { + public BoundField get(int slot) { return this.fields[slot]; } @@ -120,14 +120,14 @@ public class Schema extends Type { * @param name The name of the field * @return The field */ - public Field get(String name) { + public BoundField get(String name) { return this.fieldsByName.get(name); } /** * Get all the fields in this schema */ - public Field[] fields() { + public BoundField[] fields() { return this.fields; } @@ -151,11 +151,11 @@ public class Schema extends Type { public Struct validate(Object item) { try { Struct struct = (Struct) item; - for (Field field : fields) { + for (BoundField field : fields) { try { - field.type.validate(struct.get(field)); + field.def.type.validate(struct.get(field)); } catch (SchemaException e) { - throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage()); + throw new SchemaException("Invalid value for field '" + field.def.name + "': " + e.getMessage()); } } return struct; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index c42390b..b3e9975 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -51,16 +51,16 @@ public class Struct { * @param field The field for which to get the default value * @throws SchemaException if the field has no value and has no default. */ - private Object getFieldOrDefault(Field field) { + private Object getFieldOrDefault(BoundField field) { Object value = this.values[field.index]; if (value != null) return value; - else if (field.defaultValue != Field.NO_DEFAULT) - return field.defaultValue; - else if (field.type.isNullable()) + else if (field.def.hasDefaultValue) + return field.def.defaultValue; + else if (field.def.type.isNullable()) return null; else - throw new SchemaException("Missing value for field '" + field.name + "' which has no default value."); + throw new SchemaException("Missing value for field '" + field.def.name + "' which has no default value."); } /** @@ -70,11 +70,43 @@ public class Struct { * @return The value for that field. * @throws SchemaException if the field has no value and has no default. */ - public Object get(Field field) { + public Object get(BoundField field) { validateField(field); return getFieldOrDefault(field); } + public Byte get(Field.Int8 field) { + return getByte(field.name); + } + + public Integer get(Field.Int32 field) { + return getInt(field.name); + } + + public Short get(Field.Int16 field) { + return getShort(field.name); + } + + public String get(Field.Str field) { + return getString(field.name); + } + + public String get(Field.NullableStr field) { + return getString(field.name); + } + + public Integer getOrElse(Field.Int32 field, int alternative) { + if (hasField(field.name)) + return getInt(field.name); + return alternative; + } + + public String getOrElse(Field.NullableStr field, String alternative) { + if (hasField(field.name)) + return getString(field.name); + return alternative; + } + /** * Get the record value for the field with the given name by doing a hash table lookup (slower!) * @@ -83,7 +115,7 @@ public class Struct { * @throws SchemaException If no such field exists */ public Object get(String name) { - Field field = schema.get(name); + BoundField field = schema.get(name); if (field == null) throw new SchemaException("No such field: " + name); return getFieldOrDefault(field); @@ -98,7 +130,11 @@ public class Struct { return schema.get(name) != null; } - public Struct getStruct(Field field) { + public boolean hasField(Field def) { + return schema.get(def.name) != null; + } + + public Struct getStruct(BoundField field) { return (Struct) get(field); } @@ -106,7 +142,7 @@ public class Struct { return (Struct) get(name); } - public Byte getByte(Field field) { + public Byte getByte(BoundField field) { return (Byte) get(field); } @@ -118,7 +154,7 @@ public class Struct { return (Records) get(name); } - public Short getShort(Field field) { + public Short getShort(BoundField field) { return (Short) get(field); } @@ -126,7 +162,7 @@ public class Struct { return (Short) get(name); } - public Integer getInt(Field field) { + public Integer getInt(BoundField field) { return (Integer) get(field); } @@ -138,7 +174,7 @@ public class Struct { return (Long) get(name); } - public Long getLong(Field field) { + public Long getLong(BoundField field) { return (Long) get(field); } @@ -146,7 +182,7 @@ public class Struct { return (Long) get(name); } - public Object[] getArray(Field field) { + public Object[] getArray(BoundField field) { return (Object[]) get(field); } @@ -154,7 +190,7 @@ public class Struct { return (Object[]) get(name); } - public String getString(Field field) { + public String getString(BoundField field) { return (String) get(field); } @@ -162,7 +198,7 @@ public class Struct { return (String) get(name); } - public Boolean getBoolean(Field field) { + public Boolean getBoolean(BoundField field) { return (Boolean) get(field); } @@ -170,7 +206,7 @@ public class Struct { return (Boolean) get(name); } - public ByteBuffer getBytes(Field field) { + public ByteBuffer getBytes(BoundField field) { Object result = get(field); if (result instanceof byte[]) return ByteBuffer.wrap((byte[]) result); @@ -191,7 +227,7 @@ public class Struct { * @param value The value * @throws SchemaException If the validation of the field failed */ - public Struct set(Field field, Object value) { + public Struct set(BoundField field, Object value) { validateField(field); this.values[field.index] = value; return this; @@ -205,13 +241,40 @@ public class Struct { * @throws SchemaException If the field is not known */ public Struct set(String name, Object value) { - Field field = this.schema.get(name); + BoundField field = this.schema.get(name); if (field == null) throw new SchemaException("Unknown field: " + name); this.values[field.index] = value; return this; } + public Struct set(Field.Str def, String value) { + return set(def.name, value); + } + + public Struct set(Field.NullableStr def, String value) { + return set(def.name, value); + } + + public Struct set(Field.Int8 def, byte value) { + return set(def.name, value); + } + + public Struct set(Field.Int32 def, int value) { + return set(def.name, value); + } + + public Struct set(Field.Int16 def, short value) { + return set(def.name, value); + } + + public Struct setIfExists(Field def, Object value) { + BoundField field = this.schema.get(def.name); + if (field != null) + this.values[field.index] = value; + return this; + } + /** * Create a struct for the schema of a container type (struct or array). Note that for array type, this method * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be @@ -221,15 +284,15 @@ public class Struct { * @return The struct * @throws SchemaException If the given field is not a container type */ - public Struct instance(Field field) { + public Struct instance(BoundField field) { validateField(field); - if (field.type() instanceof Schema) { - return new Struct((Schema) field.type()); - } else if (field.type() instanceof ArrayOf) { - ArrayOf array = (ArrayOf) field.type(); + if (field.def.type instanceof Schema) { + return new Struct((Schema) field.def.type); + } else if (field.def.type instanceof ArrayOf) { + ArrayOf array = (ArrayOf) field.def.type; return new Struct((Schema) array.type()); } else { - throw new SchemaException("Field '" + field.name + "' is not a container type, it is of type " + field.type()); + throw new SchemaException("Field '" + field.def.name + "' is not a container type, it is of type " + field.def.type); } } @@ -270,9 +333,9 @@ public class Struct { * * @throws SchemaException If validation fails */ - private void validateField(Field field) { + private void validateField(BoundField field) { if (this.schema != field.schema) - throw new SchemaException("Attempt to access field '" + field.name + "' from a different schema instance."); + throw new SchemaException("Attempt to access field '" + field.def.name + "' from a different schema instance."); if (field.index > values.length) throw new SchemaException("Invalid field index: " + field.index); } @@ -291,10 +354,10 @@ public class Struct { StringBuilder b = new StringBuilder(); b.append('{'); for (int i = 0; i < this.values.length; i++) { - Field f = this.schema.get(i); - b.append(f.name); + BoundField f = this.schema.get(i); + b.append(f.def.name); b.append('='); - if (f.type() instanceof ArrayOf && this.values[i] != null) { + if (f.def.type instanceof ArrayOf && this.values[i] != null) { Object[] arrayValue = (Object[]) this.values[i]; b.append('['); for (int j = 0; j < arrayValue.length; j++) { @@ -317,8 +380,8 @@ public class Struct { final int prime = 31; int result = 1; for (int i = 0; i < this.values.length; i++) { - Field f = this.schema.get(i); - if (f.type() instanceof ArrayOf) { + BoundField f = this.schema.get(i); + if (f.def.type instanceof ArrayOf) { if (this.get(f) != null) { Object[] arrayObject = (Object[]) this.get(f); for (Object arrayItem: arrayObject) @@ -346,9 +409,9 @@ public class Struct { if (schema != other.schema) return false; for (int i = 0; i < this.values.length; i++) { - Field f = this.schema.get(i); + BoundField f = this.schema.get(i); boolean result; - if (f.type() instanceof ArrayOf) { + if (f.def.type instanceof ArrayOf) { result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f)); } else { Object thisField = this.get(f); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 34fda50..1f1418f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -133,9 +133,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return new SyncGroupRequest(struct, apiVersion); case STOP_REPLICA: return new StopReplicaRequest(struct, apiVersion); - case CONTROLLED_SHUTDOWN_KEY: + case CONTROLLED_SHUTDOWN: return new ControlledShutdownRequest(struct, apiVersion); - case UPDATE_METADATA_KEY: + case UPDATE_METADATA: return new UpdateMetadataRequest(struct, apiVersion); case LEADER_AND_ISR: return new LeaderAndIsrRequest(struct, apiVersion); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 95d1ef9..b6cb8fb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public abstract class AbstractResponse extends AbstractRequestResponse { - public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; public static final int DEFAULT_THROTTLE_TIME = 0; protected Send toSend(String destination, ResponseHeader header, short apiVersion) { @@ -66,9 +65,9 @@ public abstract class AbstractResponse extends AbstractRequestResponse { return new SyncGroupResponse(struct); case STOP_REPLICA: return new StopReplicaResponse(struct); - case CONTROLLED_SHUTDOWN_KEY: + case CONTROLLED_SHUTDOWN: return new ControlledShutdownResponse(struct); - case UPDATE_METADATA_KEY: + case UPDATE_METADATA: return new UpdateMetadataResponse(struct); case LEADER_AND_ISR: return new LeaderAndIsrResponse(struct); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java index 36b290f..e3e4d79 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java @@ -18,16 +18,32 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.types.Type.INT16; +import static org.apache.kafka.common.protocol.types.Type.INT64; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class AddOffsetsToTxnRequest extends AbstractRequest { private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String EPOCH_KEY_NAME = "producer_epoch"; private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id"; + private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema( + new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."), + new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."), + new Field(EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."), + new Field(CONSUMER_GROUP_ID_KEY_NAME, STRING, "Consumer group id whose offsets should be included in the transaction.")); + + public static Schema[] schemaVersions() { + return new Schema[]{ADD_OFFSETS_TO_TXN_REQUEST_V0}; + } + public static class Builder extends AbstractRequest.Builder { private final String transactionalId; private final long producerId; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java index 981a234..10dc279 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java @@ -18,12 +18,22 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; + public class AddOffsetsToTxnResponse extends AbstractResponse { - private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + ERROR_CODE); + + public static Schema[] schemaVersions() { + return new Schema[]{ADD_OFFSETS_TO_TXN_RESPONSE_V0}; + } // Possible error codes: // NotCoordinator @@ -44,8 +54,8 @@ public class AddOffsetsToTxnResponse extends AbstractResponse { } public AddOffsetsToTxnResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); - this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + this.throttleTimeMs = struct.get(THROTTLE_TIME_MS); + this.error = Errors.forCode(struct.get(ERROR_CODE)); } public int throttleTimeMs() { @@ -59,8 +69,8 @@ public class AddOffsetsToTxnResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); - struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); + struct.set(ERROR_CODE, error.code()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java index 6fe034c..c195e24 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java @@ -19,6 +19,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; @@ -28,14 +31,32 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.INT16; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.INT64; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class AddPartitionsToTxnRequest extends AbstractRequest { private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; - private static final String TOPIC_PARTITIONS_KEY_NAME = "topics"; - private static final String TOPIC_KEY_NAME = "topic"; + private static final String TOPICS_KEY_NAME = "topics"; private static final String PARTITIONS_KEY_NAME = "partitions"; + private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema( + new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."), + new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."), + new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."), + new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))), + "The partitions to add to the transaction.")); + + public static Schema[] schemaVersions() { + return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0}; + } + public static class Builder extends AbstractRequest.Builder { private final String transactionalId; private final long producerId; @@ -93,10 +114,10 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME); List partitions = new ArrayList<>(); - Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME); + Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME); for (Object topicPartitionObj : topicPartitionsArray) { Struct topicPartitionStruct = (Struct) topicPartitionObj; - String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME); + String topic = topicPartitionStruct.get(TOPIC_NAME); for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) { partitions.add(new TopicPartition(topic, (Integer) partitionObj)); } @@ -131,13 +152,13 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { Object[] partitionsArray = new Object[mappedPartitions.size()]; int i = 0; for (Map.Entry> topicAndPartitions : mappedPartitions.entrySet()) { - Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME); - topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey()); + Struct topicPartitionsStruct = struct.instance(TOPICS_KEY_NAME); + topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey()); topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray()); partitionsArray[i++] = topicPartitionsStruct; } - struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray); + struct.set(TOPICS_KEY_NAME, partitionsArray); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java index f05310a..e9f6088 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java @@ -19,6 +19,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; @@ -28,13 +31,27 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; + public class AddPartitionsToTxnResponse extends AbstractResponse { - private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String ERRORS_KEY_NAME = "errors"; - private static final String TOPIC_NAME = "topic"; - private static final String PARTITION = "partition"; private static final String PARTITION_ERRORS = "partition_errors"; + private static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + new Field(ERRORS_KEY_NAME, new ArrayOf(new Schema( + TOPIC_NAME, + new Field(PARTITION_ERRORS, new ArrayOf(new Schema( + PARTITION_ID, + ERROR_CODE))))))); + + public static Schema[] schemaVersions() { + return new Schema[]{ADD_PARTITIONS_TO_TXN_RESPONSE_V0}; + } + private final int throttleTimeMs; // Possible error codes: @@ -56,15 +73,15 @@ public class AddPartitionsToTxnResponse extends AbstractResponse { } public AddPartitionsToTxnResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + this.throttleTimeMs = struct.get(THROTTLE_TIME_MS); errors = new HashMap<>(); for (Object topic : struct.getArray(ERRORS_KEY_NAME)) { Struct topicStruct = (Struct) topic; - final String topicName = topicStruct.getString(TOPIC_NAME); + final String topicName = topicStruct.get(TOPIC_NAME); for (Object partition : topicStruct.getArray(PARTITION_ERRORS)) { Struct partitionStruct = (Struct) partition; - TopicPartition topicPartition = new TopicPartition(topicName, partitionStruct.getInt(PARTITION)); - errors.put(topicPartition, Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME))); + TopicPartition topicPartition = new TopicPartition(topicName, partitionStruct.get(PARTITION_ID)); + errors.put(topicPartition, Errors.forCode(partitionStruct.get(ERROR_CODE))); } } } @@ -80,7 +97,7 @@ public class AddPartitionsToTxnResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); Map> errorsByTopic = CollectionUtils.groupDataByTopic(errors); List topics = new ArrayList<>(errorsByTopic.size()); @@ -90,8 +107,8 @@ public class AddPartitionsToTxnResponse extends AbstractResponse { List partitionArray = new ArrayList<>(); for (Map.Entry partitionErrors : entry.getValue().entrySet()) { final Struct partitionData = topicErrorCodes.instance(PARTITION_ERRORS) - .set(PARTITION, partitionErrors.getKey()) - .set(ERROR_CODE_KEY_NAME, partitionErrors.getValue().code()); + .set(PARTITION_ID, partitionErrors.getKey()) + .set(ERROR_CODE, partitionErrors.getValue().code()); partitionArray.add(partitionData); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java index a964f85..14b39ae 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -27,6 +30,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; +import static org.apache.kafka.common.protocol.types.Type.INT8; +import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class AlterConfigsRequest extends AbstractRequest { private static final String RESOURCES_KEY_NAME = "resources"; @@ -38,6 +46,24 @@ public class AlterConfigsRequest extends AbstractRequest { private static final String CONFIG_NAME = "config_name"; private static final String CONFIG_VALUE = "config_value"; + private static final Schema CONFIG_ENTRY = new Schema( + new Field(CONFIG_NAME, STRING, "Configuration name"), + new Field(CONFIG_VALUE, NULLABLE_STRING, "Configuration value")); + + private static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( + new Field(RESOURCE_TYPE_KEY_NAME, INT8), + new Field(RESOURCE_NAME_KEY_NAME, STRING), + new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY))); + + private static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema( + new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0), + "An array of resources to update with the provided configs."), + new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN)); + + public static Schema[] schemaVersions() { + return new Schema[] {ALTER_CONFIGS_REQUEST_V0}; + } + public static class Config { private final Collection entries; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java index 3a3eb9a..df9416e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -26,12 +29,32 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.types.Type.INT8; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class AlterConfigsResponse extends AbstractResponse { private static final String RESOURCES_KEY_NAME = "resources"; private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; + private static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( + ERROR_CODE, + ERROR_MESSAGE, + new Field(RESOURCE_TYPE_KEY_NAME, INT8), + new Field(RESOURCE_NAME_KEY_NAME, STRING)); + + private static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0))); + + public static Schema[] schemaVersions() { + return new Schema[]{ALTER_CONFIGS_RESPONSE_V0}; + } + private final int throttleTimeMs; private final Map errors; @@ -42,7 +65,7 @@ public class AlterConfigsResponse extends AbstractResponse { } public AlterConfigsResponse(Struct struct) { - throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + throttleTimeMs = struct.get(THROTTLE_TIME_MS); Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); errors = new HashMap<>(resourcesArray.length); for (Object resourceObj : resourcesArray) { @@ -65,7 +88,7 @@ public class AlterConfigsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); List resourceStructs = new ArrayList<>(errors.size()); for (Map.Entry entry : errors.entrySet()) { Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java index 2c2401b..7e58fd6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java @@ -20,14 +20,21 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.STRING; public class AlterReplicaDirRequest extends AbstractRequest { @@ -39,9 +46,19 @@ public class AlterReplicaDirRequest extends AbstractRequest { private static final String TOPICS_KEY_NAME = "topics"; // topic level key names - private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; + private static final Schema ALTER_REPLICA_DIR_REQUEST_V0 = new Schema( + new Field("log_dirs", new ArrayOf(new Schema( + new Field("log_dir", STRING, "The absolute log directory path."), + new Field("topics", new ArrayOf(new Schema( + TOPIC_NAME, + new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic.")))))))); + + public static Schema[] schemaVersions() { + return new Schema[]{ALTER_REPLICA_DIR_REQUEST_V0}; + } + private final Map partitionDirs; public static class Builder extends AbstractRequest.Builder { @@ -76,7 +93,7 @@ public class AlterReplicaDirRequest extends AbstractRequest { String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME); for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) { Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.getString(TOPIC_KEY_NAME); + String topic = topicStruct.get(TOPIC_NAME); for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { int partition = (Integer) partitionObj; partitionDirs.put(new TopicPartition(topic, partition), logDir); @@ -108,7 +125,7 @@ public class AlterReplicaDirRequest extends AbstractRequest { List topicStructArray = new ArrayList<>(); for (Map.Entry> topicEntry: CollectionUtils.groupDataByTopic(logDirEntry.getValue()).entrySet()) { Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_KEY_NAME, topicEntry.getKey()); + topicStruct.set(TOPIC_NAME, topicEntry.getKey()); topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); topicStructArray.add(topicStruct); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java index f97f9a0..ed00b75 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java @@ -20,14 +20,23 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; + public class AlterReplicaDirResponse extends AbstractResponse { @@ -35,12 +44,19 @@ public class AlterReplicaDirResponse extends AbstractResponse { private static final String TOPICS_KEY_NAME = "topics"; // topic level key names - private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; - // partition level key names - private static final String PARTITION_KEY_NAME = "partition"; - private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final Schema ALTER_REPLICA_DIR_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + new Field("topics", new ArrayOf(new Schema( + TOPIC_NAME, + new Field("partitions", new ArrayOf(new Schema( + PARTITION_ID, + ERROR_CODE))))))); + + public static Schema[] schemaVersions() { + return new Schema[]{ALTER_REPLICA_DIR_RESPONSE_V0}; + } /** * Possible error code: @@ -54,15 +70,15 @@ public class AlterReplicaDirResponse extends AbstractResponse { private final int throttleTimeMs; public AlterReplicaDirResponse(Struct struct) { - throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + throttleTimeMs = struct.get(THROTTLE_TIME_MS); responses = new HashMap<>(); for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.getString(TOPIC_KEY_NAME); + String topic = topicStruct.get(TOPIC_NAME); for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { Struct partitionStruct = (Struct) partitionStructObj; - int partition = partitionStruct.getInt(PARTITION_KEY_NAME); - Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME)); + int partition = partitionStruct.get(PARTITION_ID); + Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE)); responses.put(new TopicPartition(topic, partition), error); } } @@ -79,18 +95,18 @@ public class AlterReplicaDirResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); Map> responsesByTopic = CollectionUtils.groupDataByTopic(responses); List topicStructArray = new ArrayList<>(); for (Map.Entry> responsesByTopicEntry : responsesByTopic.entrySet()) { Struct topicStruct = struct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_KEY_NAME, responsesByTopicEntry.getKey()); + topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey()); List partitionStructArray = new ArrayList<>(); for (Map.Entry responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) { Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME); Errors response = responsesByPartitionEntry.getValue(); - partitionStruct.set(PARTITION_KEY_NAME, responsesByPartitionEntry.getKey()); - partitionStruct.set(ERROR_CODE_KEY_NAME, response.code()); + partitionStruct.set(PARTITION_ID, responsesByPartitionEntry.getKey()); + partitionStruct.set(ERROR_CODE, response.code()); partitionStructArray.add(partitionStruct); } topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray()); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java index d712123..dad21b3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java @@ -21,6 +21,9 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE; + /** * Encapsulates an error code (via the Errors enum) and an optional message. Generally, the optional message is only * defined if it adds information over the default message associated with the error code. @@ -31,9 +34,6 @@ public class ApiError { public static final ApiError NONE = new ApiError(Errors.NONE, null); - private static final String CODE_KEY_NAME = "error_code"; - private static final String MESSAGE_KEY_NAME = "error_message"; - private final Errors error; private final String message; @@ -45,12 +45,9 @@ public class ApiError { } public ApiError(Struct struct) { - error = Errors.forCode(struct.getShort(CODE_KEY_NAME)); + error = Errors.forCode(struct.get(ERROR_CODE)); // In some cases, the error message field was introduced in newer version - if (struct.hasField(MESSAGE_KEY_NAME)) - message = struct.getString(MESSAGE_KEY_NAME); - else - message = null; + message = struct.getOrElse(ERROR_MESSAGE, null); } public ApiError(Errors error, String message) { @@ -59,10 +56,9 @@ public class ApiError { } public void write(Struct struct) { - struct.set(CODE_KEY_NAME, error.code()); - // In some cases, the error message field was introduced in a newer protocol API version - if (struct.hasField(MESSAGE_KEY_NAME) && message != null && error != Errors.NONE) - struct.set(MESSAGE_KEY_NAME, message); + struct.set(ERROR_CODE, error.code()); + if (error != Errors.NONE) + struct.setIfExists(ERROR_MESSAGE, message); } public boolean is(Errors error) { http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java index 025ef6c..22daf6c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java @@ -18,12 +18,22 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; import java.util.Collections; public class ApiVersionsRequest extends AbstractRequest { + private static final Schema API_VERSIONS_REQUEST_V0 = new Schema(); + + /* v1 request is the same as v0. Throttle time has been added to response */ + private static final Schema API_VERSIONS_REQUEST_V1 = API_VERSIONS_REQUEST_V0; + + public static Schema[] schemaVersions() { + return new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1}; + } + public static class Builder extends AbstractRequest.Builder { public Builder() { http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 5a48c93..6a0418f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.RecordBatch; @@ -28,15 +31,35 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ApiVersionsResponse extends AbstractResponse { +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.types.Type.INT16; - public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE); - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; - public static final String ERROR_CODE_KEY_NAME = "error_code"; - public static final String API_VERSIONS_KEY_NAME = "api_versions"; - public static final String API_KEY_NAME = "api_key"; - public static final String MIN_VERSION_KEY_NAME = "min_version"; - public static final String MAX_VERSION_KEY_NAME = "max_version"; +public class ApiVersionsResponse extends AbstractResponse { + private static final String API_VERSIONS_KEY_NAME = "api_versions"; + private static final String API_KEY_NAME = "api_key"; + private static final String MIN_VERSION_KEY_NAME = "min_version"; + private static final String MAX_VERSION_KEY_NAME = "max_version"; + + private static final Schema API_VERSIONS_V0 = new Schema( + new Field(API_KEY_NAME, INT16, "API key."), + new Field(MIN_VERSION_KEY_NAME, INT16, "Minimum supported version."), + new Field(MAX_VERSION_KEY_NAME, INT16, "Maximum supported version.")); + + private static final Schema API_VERSIONS_RESPONSE_V0 = new Schema( + ERROR_CODE, + new Field(API_VERSIONS_KEY_NAME, new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker.")); + private static final Schema API_VERSIONS_RESPONSE_V1 = new Schema( + ERROR_CODE, + new Field(API_VERSIONS_KEY_NAME, new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."), + THROTTLE_TIME_MS); + + // initialized lazily to avoid circular initialization dependence with ApiKeys + private static volatile ApiVersionsResponse defaultApiVersionsResponse; + + public static Schema[] schemaVersions() { + return new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1}; + } /** * Possible error codes: @@ -83,8 +106,8 @@ public class ApiVersionsResponse extends AbstractResponse { } public ApiVersionsResponse(Struct struct) { - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; - this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + this.error = Errors.forCode(struct.get(ERROR_CODE)); List tempApiVersions = new ArrayList<>(); for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) { Struct apiVersionStruct = (Struct) apiVersionsObj; @@ -99,9 +122,8 @@ public class ApiVersionsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.API_VERSIONS.responseSchema(version)); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); - struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); + struct.set(ERROR_CODE, error.code()); List apiVersionList = new ArrayList<>(); for (ApiVersion apiVersion : apiKeyToApiVersion.values()) { Struct apiVersionStruct = struct.instance(API_VERSIONS_KEY_NAME); @@ -116,7 +138,7 @@ public class ApiVersionsResponse extends AbstractResponse { public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) { if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME) { - return API_VERSIONS_RESPONSE; + return defaultApiVersionsResponse(); } return createApiVersionsResponse(throttleTimeMs, maxMagic); } @@ -141,22 +163,28 @@ public class ApiVersionsResponse extends AbstractResponse { return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer)); } + private Map buildApiKeyToApiVersion(List apiVersions) { + Map tempApiIdToApiVersion = new HashMap<>(); + for (ApiVersion apiVersion: apiVersions) { + tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion); + } + return tempApiIdToApiVersion; + } + public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs, final byte minMagic) { - List versionList = new ArrayList<>(); + List versionList = new ArrayList<>(); for (ApiKeys apiKey : ApiKeys.values()) { if (apiKey.minRequiredInterBrokerMagic <= minMagic) { - versionList.add(new ApiVersion(apiKey)); + versionList.add(new ApiVersionsResponse.ApiVersion(apiKey)); } } return new ApiVersionsResponse(throttleTimeMs, Errors.NONE, versionList); } - private Map buildApiKeyToApiVersion(List apiVersions) { - Map tempApiIdToApiVersion = new HashMap<>(); - for (ApiVersion apiVersion: apiVersions) { - tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion); - } - return tempApiIdToApiVersion; + public static ApiVersionsResponse defaultApiVersionsResponse() { + if (defaultApiVersionsResponse == null) + defaultApiVersionsResponse = createApiVersionsResponse(DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE); + return defaultApiVersionsResponse; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java index 1b49c6a..c77bd13 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java @@ -19,14 +19,26 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; import java.util.Collections; +import static org.apache.kafka.common.protocol.types.Type.INT32; + public class ControlledShutdownRequest extends AbstractRequest { private static final String BROKER_ID_KEY_NAME = "broker_id"; + private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0 = new Schema( + new Field(BROKER_ID_KEY_NAME, INT32, "The id of the broker for which controlled shutdown has been requested.")); + private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = CONTROLLED_SHUTDOWN_REQUEST_V0; + + public static Schema[] schemaVersions() { + return new Schema[] {CONTROLLED_SHUTDOWN_REQUEST_V0, CONTROLLED_SHUTDOWN_REQUEST_V1}; + } + public static class Builder extends AbstractRequest.Builder { private final int brokerId; @@ -35,7 +47,7 @@ public class ControlledShutdownRequest extends AbstractRequest { } public Builder(int brokerId, Short desiredVersion) { - super(ApiKeys.CONTROLLED_SHUTDOWN_KEY, desiredVersion); + super(ApiKeys.CONTROLLED_SHUTDOWN, desiredVersion); this.brokerId = brokerId; } @@ -74,7 +86,7 @@ public class ControlledShutdownRequest extends AbstractRequest { return new ControlledShutdownResponse(Errors.forException(e), Collections.emptySet()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN_KEY.latestVersion())); + versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN.latestVersion())); } } @@ -84,12 +96,12 @@ public class ControlledShutdownRequest extends AbstractRequest { public static ControlledShutdownRequest parse(ByteBuffer buffer, short version) { return new ControlledShutdownRequest( - ApiKeys.CONTROLLED_SHUTDOWN_KEY.parseRequest(version, buffer), version); + ApiKeys.CONTROLLED_SHUTDOWN.parseRequest(version, buffer), version); } @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN_KEY.requestSchema(version())); + Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN.requestSchema(version())); struct.set(BROKER_ID_KEY_NAME, brokerId); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java index 00973f0..e0b3860 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java @@ -19,6 +19,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -27,13 +30,28 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; + public class ControlledShutdownResponse extends AbstractResponse { - private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String PARTITIONS_REMAINING_KEY_NAME = "partitions_remaining"; - private static final String TOPIC_KEY_NAME = "topic"; - private static final String PARTITION_KEY_NAME = "partition"; + private static final Schema CONTROLLED_SHUTDOWN_PARTITION_V0 = new Schema( + TOPIC_NAME, + PARTITION_ID); + + private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema( + ERROR_CODE, + new Field("partitions_remaining", new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0), "The partitions " + + "that the broker still leads.")); + + private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0; + + public static Schema[] schemaVersions() { + return new Schema[]{CONTROLLED_SHUTDOWN_RESPONSE_V0, CONTROLLED_SHUTDOWN_RESPONSE_V1}; + } /** * Possible error codes: @@ -52,12 +70,12 @@ public class ControlledShutdownResponse extends AbstractResponse { } public ControlledShutdownResponse(Struct struct) { - error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + error = Errors.forCode(struct.get(ERROR_CODE)); Set partitions = new HashSet<>(); for (Object topicPartitionObj : struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) { Struct topicPartition = (Struct) topicPartitionObj; - String topic = topicPartition.getString(TOPIC_KEY_NAME); - int partition = topicPartition.getInt(PARTITION_KEY_NAME); + String topic = topicPartition.get(TOPIC_NAME); + int partition = topicPartition.get(PARTITION_ID); partitions.add(new TopicPartition(topic, partition)); } partitionsRemaining = partitions; @@ -72,20 +90,19 @@ public class ControlledShutdownResponse extends AbstractResponse { } public static ControlledShutdownResponse parse(ByteBuffer buffer, short version) { - return new ControlledShutdownResponse(ApiKeys.CONTROLLED_SHUTDOWN_KEY.parseResponse(version, buffer)); + return new ControlledShutdownResponse(ApiKeys.CONTROLLED_SHUTDOWN.parseResponse(version, buffer)); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN_KEY.responseSchema(version)); - - struct.set(ERROR_CODE_KEY_NAME, error.code()); + Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN.responseSchema(version)); + struct.set(ERROR_CODE, error.code()); List partitionsRemainingList = new ArrayList<>(partitionsRemaining.size()); for (TopicPartition topicPartition : partitionsRemaining) { Struct topicPartitionStruct = struct.instance(PARTITIONS_REMAINING_KEY_NAME); - topicPartitionStruct.set(TOPIC_KEY_NAME, topicPartition.topic()); - topicPartitionStruct.set(PARTITION_KEY_NAME, topicPartition.partition()); + topicPartitionStruct.set(TOPIC_NAME, topicPartition.topic()); + topicPartitionStruct.set(PARTITION_ID, topicPartition.partition()); partitionsRemainingList.add(topicPartitionStruct); } struct.set(PARTITIONS_REMAINING_KEY_NAME, partitionsRemainingList.toArray()); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java index 3598d4f..d281b3b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java @@ -20,6 +20,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.utils.Utils; @@ -28,8 +31,28 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import static org.apache.kafka.common.protocol.CommonFields.HOST; +import static org.apache.kafka.common.protocol.CommonFields.OPERATION; +import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE; +import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL; +import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME; +import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE; + public class CreateAclsRequest extends AbstractRequest { - private final static String CREATIONS = "creations"; + private final static String CREATIONS_KEY_NAME = "creations"; + + private static final Schema CREATE_ACLS_REQUEST_V0 = new Schema( + new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema( + RESOURCE_TYPE, + RESOURCE_NAME, + PRINCIPAL, + HOST, + OPERATION, + PERMISSION_TYPE)))); + + public static Schema[] schemaVersions() { + return new Schema[]{CREATE_ACLS_REQUEST_V0}; + } public static class AclCreation { private final AclBinding acl; @@ -88,7 +111,7 @@ public class CreateAclsRequest extends AbstractRequest { public CreateAclsRequest(Struct struct, short version) { super(version); this.aclCreations = new ArrayList<>(); - for (Object creationStructObj : struct.getArray(CREATIONS)) { + for (Object creationStructObj : struct.getArray(CREATIONS_KEY_NAME)) { Struct creationStruct = (Struct) creationStructObj; aclCreations.add(AclCreation.fromStruct(creationStruct)); } @@ -99,11 +122,11 @@ public class CreateAclsRequest extends AbstractRequest { Struct struct = new Struct(ApiKeys.CREATE_ACLS.requestSchema(version())); List requests = new ArrayList<>(); for (AclCreation creation : aclCreations) { - Struct creationStruct = struct.instance(CREATIONS); + Struct creationStruct = struct.instance(CREATIONS_KEY_NAME); creation.setStructFields(creationStruct); requests.add(creationStruct); } - struct.set(CREATIONS, requests.toArray()); + struct.set(CREATIONS_KEY_NAME, requests.toArray()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java index 1fc75da..836215e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java @@ -17,14 +17,31 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; + public class CreateAclsResponse extends AbstractResponse { - private final static String CREATION_RESPONSES = "creation_responses"; + private final static String CREATION_RESPONSES_KEY_NAME = "creation_responses"; + + private static final Schema CREATE_ACLS_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + new Field(CREATION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( + ERROR_CODE, + ERROR_MESSAGE)))); + + public static Schema[] schemaVersions() { + return new Schema[]{CREATE_ACLS_RESPONSE_V0}; + } public static class AclCreationResponse { private final ApiError error; @@ -53,9 +70,9 @@ public class CreateAclsResponse extends AbstractResponse { } public CreateAclsResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + this.throttleTimeMs = struct.get(THROTTLE_TIME_MS); this.aclCreationResponses = new ArrayList<>(); - for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) { + for (Object responseStructObj : struct.getArray(CREATION_RESPONSES_KEY_NAME)) { Struct responseStruct = (Struct) responseStructObj; ApiError error = new ApiError(responseStruct); this.aclCreationResponses.add(new AclCreationResponse(error)); @@ -65,14 +82,14 @@ public class CreateAclsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.CREATE_ACLS.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); List responseStructs = new ArrayList<>(); for (AclCreationResponse response : aclCreationResponses) { - Struct responseStruct = struct.instance(CREATION_RESPONSES); + Struct responseStruct = struct.instance(CREATION_RESPONSES_KEY_NAME); response.error.write(responseStruct); responseStructs.add(responseStruct); } - struct.set(CREATION_RESPONSES, responseStructs.toArray()); + struct.set(CREATION_RESPONSES_KEY_NAME, responseStructs.toArray()); return struct; }