http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 39c027b..4a60c94 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -19,6 +19,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.MemoryRecords; @@ -28,6 +31,12 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.INT64; +import static org.apache.kafka.common.protocol.types.Type.INT8; + public class FetchRequest extends AbstractRequest { public static final int CONSUMER_REPLICA_ID = -1; private static final String REPLICA_ID_KEY_NAME = "replica_id"; @@ -40,14 +49,101 @@ public class FetchRequest extends AbstractRequest { private static final String MAX_BYTES_KEY_NAME = "max_bytes"; // topic level field names - private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; // partition level field names - private static final String PARTITION_KEY_NAME = "partition"; private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset"; private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset"; + private static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema( + PARTITION_ID, + new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."), + new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to fetch.")); + + // FETCH_REQUEST_PARTITION_V1 added log_start_offset field - the earliest available offset of partition data that can be consumed. + private static final Schema FETCH_REQUEST_PARTITION_V5 = new Schema( + PARTITION_ID, + new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."), + new Field(LOG_START_OFFSET_KEY_NAME, INT64, "Earliest available offset of the follower replica. " + + "The field is only used when request is sent by follower. "), + new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to fetch.")); + + private static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_REQUEST_PARTITION_V0), "Partitions to fetch.")); + + private static final Schema FETCH_REQUEST_TOPIC_V5 = new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_REQUEST_PARTITION_V5), "Partitions to fetch.")); + + private static final Schema FETCH_REQUEST_V0 = new Schema( + new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch.")); + + // The V1 Fetch Request body is the same as V0. + // Only the version number is incremented to indicate a newer client + private static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0; + // The V2 Fetch Request body is the same as V1. + // Only the version number is incremented to indicate the client support message format V1 which uses + // relative offset and has timestamp. + private static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1; + // Fetch Request V3 added top level max_bytes field - the total size of partition data to accumulate in response. + // The partition ordering is now relevant - partitions will be processed in order they appear in request. + private static final Schema FETCH_REQUEST_V3 = new Schema( + new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + + "if the first message in the first non-empty partition of the fetch is larger than this " + + "value, the message will still be returned to ensure that progress can be made."), + new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch in the order provided.")); + + // The V4 Fetch Request adds the fetch isolation level and exposes magic v2 (via the response). + private static final Schema FETCH_REQUEST_V4 = new Schema( + new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + + "if the first message in the first non-empty partition of the fetch is larger than this " + + "value, the message will still be returned to ensure that progress can be made."), + new Field(ISOLATION_LEVEL_KEY_NAME, INT8, "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " + + "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " + + "non-transactional and COMMITTED transactional records are visible. To be more concrete, " + + "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " + + "and enables the inclusion of the list of aborted transactions in the result, which allows " + + "consumers to discard ABORTED transactional records"), + new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch in the order provided.")); + + // FETCH_REQUEST_V5 added a per-partition log_start_offset field - the earliest available offset of partition data that can be consumed. + private static final Schema FETCH_REQUEST_V5 = new Schema( + new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + + "if the first message in the first non-empty partition of the fetch is larger than this " + + "value, the message will still be returned to ensure that progress can be made."), + new Field(ISOLATION_LEVEL_KEY_NAME, INT8, "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " + + "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " + + "non-transactional and COMMITTED transactional records are visible. To be more concrete, " + + "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " + + "and enables the inclusion of the list of aborted transactions in the result, which allows " + + "consumers to discard ABORTED transactional records"), + new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V5), "Topics to fetch in the order provided.")); + + /** + * The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5. + * The version number is bumped up to indicate that the client supports KafkaStorageException. + * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5 + */ + private static final Schema FETCH_REQUEST_V6 = FETCH_REQUEST_V5; + + public static Schema[] schemaVersions() { + return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, + FETCH_REQUEST_V5, FETCH_REQUEST_V6}; + }; + // default values for older versions where a request level limit did not exist public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE; public static final long INVALID_LOG_START_OFFSET = -1L; @@ -191,10 +287,10 @@ public class FetchRequest extends AbstractRequest { fetchData = new LinkedHashMap<>(); for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.getString(TOPIC_KEY_NAME); + String topic = topicResponse.get(TOPIC_NAME); for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + int partition = partitionResponse.get(PARTITION_ID); long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME); int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME); long logStartOffset = partitionResponse.hasField(LOG_START_OFFSET_KEY_NAME) ? @@ -266,12 +362,12 @@ public class FetchRequest extends AbstractRequest { List topicArray = new ArrayList<>(); for (TopicAndPartitionData topicEntry : topicsData) { Struct topicData = struct.instance(TOPICS_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, topicEntry.topic); + topicData.set(TOPIC_NAME, topicEntry.topic); List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : topicEntry.partitions.entrySet()) { PartitionData fetchPartitionData = partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); - partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(PARTITION_ID, partitionEntry.getKey()); partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.fetchOffset); if (partitionData.hasField(LOG_START_OFFSET_KEY_NAME)) partitionData.set(LOG_START_OFFSET_KEY_NAME, fetchPartitionData.logStartOffset); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 281ad44..417e845 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -22,9 +22,10 @@ import org.apache.kafka.common.network.MultiSend; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; - +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.record.Records; import java.nio.ByteBuffer; @@ -33,6 +34,14 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.INT64; +import static org.apache.kafka.common.protocol.types.Type.RECORDS; +import static org.apache.kafka.common.protocol.types.Type.STRING; + /** * This wrapper supports all versions of the Fetch API */ @@ -41,13 +50,10 @@ public class FetchResponse extends AbstractResponse { private static final String RESPONSES_KEY_NAME = "responses"; // topic level field names - private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partition_responses"; // partition level field names private static final String PARTITION_HEADER_KEY_NAME = "partition_header"; - private static final String PARTITION_KEY_NAME = "partition"; - private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark"; private static final String LAST_STABLE_OFFSET_KEY_NAME = "last_stable_offset"; private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset"; @@ -58,7 +64,93 @@ public class FetchResponse extends AbstractResponse { private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String FIRST_OFFSET_KEY_NAME = "first_offset"; - private static final int DEFAULT_THROTTLE_TIME = 0; + private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema( + PARTITION_ID, + ERROR_CODE, + new Field(HIGH_WATERMARK_KEY_NAME, INT64, "Last committed offset.")); + private static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema( + new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V0), + new Field(RECORD_SET_KEY_NAME, RECORDS)); + + private static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V0))); + + private static final Schema FETCH_RESPONSE_V0 = new Schema( + new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); + + private static final Schema FETCH_RESPONSE_V1 = new Schema( + THROTTLE_TIME_MS, + new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); + // Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1, + // record set only includes messages of v0 (magic byte 0). In v2, record set can include messages of v0 and v1 + // (magic byte 0 and 1). For details, see ByteBufferMessageSet. + private static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1; + private static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2; + + // The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the + // last stable offset). It also exposes messages with magic v2 (along with older formats). + private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema( + new Field(PRODUCER_ID_KEY_NAME, INT64, "The producer id associated with the aborted transactions"), + new Field(FIRST_OFFSET_KEY_NAME, INT64, "The first offset in the aborted transaction")); + + private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V5 = FETCH_RESPONSE_ABORTED_TRANSACTION_V4; + + private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema( + PARTITION_ID, + ERROR_CODE, + new Field(HIGH_WATERMARK_KEY_NAME, INT64, "Last committed offset."), + new Field(LAST_STABLE_OFFSET_KEY_NAME, INT64, "The last stable offset (or LSO) of the partition. This is the last offset such that the state " + + "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"), + new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4))); + + // FETCH_RESPONSE_PARTITION_HEADER_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed. + private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V5 = new Schema( + PARTITION_ID, + ERROR_CODE, + new Field(HIGH_WATERMARK_KEY_NAME, INT64, "Last committed offset."), + new Field(LAST_STABLE_OFFSET_KEY_NAME, INT64, "The last stable offset (or LSO) of the partition. This is the last offset such that the state " + + "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"), + new Field(LOG_START_OFFSET_KEY_NAME, INT64, "Earliest available offset."), + new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V5))); + + private static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema( + new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V4), + new Field(RECORD_SET_KEY_NAME, RECORDS)); + + private static final Schema FETCH_RESPONSE_PARTITION_V5 = new Schema( + new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V5), + new Field(RECORD_SET_KEY_NAME, RECORDS)); + + private static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V4))); + + private static final Schema FETCH_RESPONSE_TOPIC_V5 = new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V5))); + + private static final Schema FETCH_RESPONSE_V4 = new Schema( + THROTTLE_TIME_MS, + new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V4))); + + private static final Schema FETCH_RESPONSE_V5 = new Schema( + THROTTLE_TIME_MS, + new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5))); + + /** + * The body of FETCH_RESPONSE_V6 is the same as FETCH_RESPONSE_V5. + * The version number is bumped up to indicate that the client supports KafkaStorageException. + * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5 + */ + private static final Schema FETCH_RESPONSE_V6 = FETCH_RESPONSE_V5; + + public static Schema[] schemaVersions() { + return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, + FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6}; + } + + public static final long INVALID_HIGHWATERMARK = -1L; public static final long INVALID_LAST_STABLE_OFFSET = -1L; public static final long INVALID_LOG_START_OFFSET = -1L; @@ -190,12 +282,12 @@ public class FetchResponse extends AbstractResponse { LinkedHashMap responseData = new LinkedHashMap<>(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.getString(TOPIC_KEY_NAME); + String topic = topicResponse.get(TOPIC_NAME); for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { Struct partitionResponse = (Struct) partitionResponseObj; Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME); - int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME); - Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME)); + int partition = partitionResponseHeader.get(PARTITION_ID); + Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE)); long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME); long lastStableOffset = INVALID_LAST_STABLE_OFFSET; if (partitionResponseHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME)) @@ -226,7 +318,7 @@ public class FetchResponse extends AbstractResponse { } } this.responseData = responseData; - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); } @Override @@ -266,7 +358,7 @@ public class FetchResponse extends AbstractResponse { private static void addResponseData(Struct struct, int throttleTimeMs, String dest, List sends) { Object[] allTopicData = struct.getArray(RESPONSES_KEY_NAME); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) { + if (struct.hasField(THROTTLE_TIME_MS)) { ByteBuffer buffer = ByteBuffer.allocate(8); buffer.putInt(throttleTimeMs); buffer.putInt(allTopicData.length); @@ -284,12 +376,12 @@ public class FetchResponse extends AbstractResponse { } private static void addTopicData(String dest, List sends, Struct topicData) { - String topic = topicData.getString(TOPIC_KEY_NAME); + String topic = topicData.get(TOPIC_NAME); Object[] allPartitionData = topicData.getArray(PARTITIONS_KEY_NAME); // include the topic header and the count for the number of partitions - ByteBuffer buffer = ByteBuffer.allocate(Type.STRING.sizeOf(topic) + 4); - Type.STRING.write(buffer, topic); + ByteBuffer buffer = ByteBuffer.allocate(STRING.sizeOf(topic) + 4); + STRING.write(buffer, topic); buffer.putInt(allPartitionData.length); buffer.rewind(); sends.add(new ByteBufferSend(dest, buffer)); @@ -313,13 +405,13 @@ public class FetchResponse extends AbstractResponse { sends.add(new RecordsSend(dest, records)); } - private static Struct toStruct(short version, LinkedHashMap responseData, int throttleTime) { + private static Struct toStruct(short version, LinkedHashMap responseData, int throttleTimeMs) { Struct struct = new Struct(ApiKeys.FETCH.responseSchema(version)); List> topicsData = FetchRequest.TopicAndPartitionData.batchByTopic(responseData); List topicArray = new ArrayList<>(); for (FetchRequest.TopicAndPartitionData topicEntry: topicsData) { Struct topicData = struct.instance(RESPONSES_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, topicEntry.topic); + topicData.set(TOPIC_NAME, topicEntry.topic); List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : topicEntry.partitions.entrySet()) { PartitionData fetchPartitionData = partitionEntry.getValue(); @@ -332,8 +424,8 @@ public class FetchResponse extends AbstractResponse { errorCode = Errors.NOT_LEADER_FOR_PARTITION.code(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); Struct partitionDataHeader = partitionData.instance(PARTITION_HEADER_KEY_NAME); - partitionDataHeader.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionDataHeader.set(ERROR_CODE_KEY_NAME, errorCode); + partitionDataHeader.set(PARTITION_ID, partitionEntry.getKey()); + partitionDataHeader.set(ERROR_CODE, errorCode); partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark); if (partitionDataHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME)) { @@ -363,9 +455,7 @@ public class FetchResponse extends AbstractResponse { topicArray.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); - - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTime); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java index fbc7fa2..c94bcde 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java @@ -20,15 +20,32 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.types.Type.INT8; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class FindCoordinatorRequest extends AbstractRequest { private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key"; private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type"; + private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema( + new Field("group_id", STRING, "The unique group id.")); + + private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema( + new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " + + "for transactional producers, this is the transactional id)"), + new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)")); + + public static Schema[] schemaVersions() { + return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1}; + } + public static class Builder extends AbstractRequest.Builder { private final String coordinatorKey; private final CoordinatorType coordinatorType; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java index ae6986a..f5d9805 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java @@ -19,16 +19,46 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Node; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -public class FindCoordinatorResponse extends AbstractResponse { +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.STRING; - private static final String ERROR_CODE_KEY_NAME = "error_code"; - private static final String ERROR_MESSAGE_KEY_NAME = "error_message"; +public class FindCoordinatorResponse extends AbstractResponse { private static final String COORDINATOR_KEY_NAME = "coordinator"; + // coordinator level field names + private static final String NODE_ID_KEY_NAME = "node_id"; + private static final String HOST_KEY_NAME = "host"; + private static final String PORT_KEY_NAME = "port"; + + private static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema( + new Field(NODE_ID_KEY_NAME, INT32, "The broker id."), + new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."), + new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests.")); + + private static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema( + ERROR_CODE, + new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator " + + "for a consumer group.")); + + private static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema( + THROTTLE_TIME_MS, + ERROR_CODE, + ERROR_MESSAGE, + new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator")); + + public static Schema[] schemaVersions() { + return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1}; + } + /** * Possible error codes: * @@ -37,10 +67,6 @@ public class FindCoordinatorResponse extends AbstractResponse { * GROUP_AUTHORIZATION_FAILED (30) */ - // coordinator level field names - private static final String NODE_ID_KEY_NAME = "node_id"; - private static final String HOST_KEY_NAME = "host"; - private static final String PORT_KEY_NAME = "port"; private final int throttleTimeMs; private final String errorMessage; @@ -59,12 +85,9 @@ public class FindCoordinatorResponse extends AbstractResponse { } public FindCoordinatorResponse(Struct struct) { - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; - error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); - if (struct.hasField(ERROR_MESSAGE_KEY_NAME)) - errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME); - else - errorMessage = null; + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + error = Errors.forCode(struct.get(ERROR_CODE)); + errorMessage = struct.getOrElse(ERROR_MESSAGE, null); Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME); int nodeId = broker.getInt(NODE_ID_KEY_NAME); @@ -88,11 +111,9 @@ public class FindCoordinatorResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version)); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); - struct.set(ERROR_CODE_KEY_NAME, error.code()); - if (struct.hasField(ERROR_MESSAGE_KEY_NAME)) - struct.set(ERROR_MESSAGE_KEY_NAME, errorMessage); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); + struct.set(ERROR_CODE, error.code()); + struct.setIfExists(ERROR_MESSAGE, errorMessage); Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); coordinator.set(NODE_ID_KEY_NAME, node.id()); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index 7e08a55..00a806f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -18,15 +18,32 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class HeartbeatRequest extends AbstractRequest { private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id"; private static final String MEMBER_ID_KEY_NAME = "member_id"; + private static final Schema HEARTBEAT_REQUEST_V0 = new Schema( + new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), + new Field(GROUP_GENERATION_ID_KEY_NAME, INT32, "The generation of the group."), + new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by the group coordinator.")); + + /* v1 request is the same as v0. Throttle time has been added to response */ + private static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0; + + public static Schema[] schemaVersions() { + return new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1}; + } + public static class Builder extends AbstractRequest.Builder { private final String groupId; private final int groupGenerationId; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java index cec39f0..e41d937 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -18,13 +18,25 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; + public class HeartbeatResponse extends AbstractResponse { - private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final Schema HEARTBEAT_RESPONSE_V0 = new Schema( + ERROR_CODE); + private static final Schema HEARTBEAT_RESPONSE_V1 = new Schema( + THROTTLE_TIME_MS, + ERROR_CODE); + + public static Schema[] schemaVersions() { + return new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1}; + } /** * Possible error codes: @@ -49,8 +61,8 @@ public class HeartbeatResponse extends AbstractResponse { } public HeartbeatResponse(Struct struct) { - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; - error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + error = Errors.forCode(struct.get(ERROR_CODE)); } public int throttleTimeMs() { @@ -64,9 +76,8 @@ public class HeartbeatResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.HEARTBEAT.responseSchema(version)); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); - struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); + struct.set(ERROR_CODE, error.code()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java index 45f88a2..fa14a97 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java @@ -18,16 +18,29 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; + public class InitProducerIdRequest extends AbstractRequest { public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE; private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms"; + private static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema( + new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional id whose producer id we want to retrieve or generate."), + new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer.")); + + public static Schema[] schemaVersions() { + return new Schema[]{INIT_PRODUCER_ID_REQUEST_V0}; + } + private final String transactionalId; private final int transactionTimeoutMs; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java index 88fb09c..8799ad7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java @@ -18,11 +18,18 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.RecordBatch; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.types.Type.INT16; +import static org.apache.kafka.common.protocol.types.Type.INT64; + public class InitProducerIdResponse extends AbstractResponse { // Possible error codes: // NotCoordinator @@ -33,7 +40,19 @@ public class InitProducerIdResponse extends AbstractResponse { private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String EPOCH_KEY_NAME = "producer_epoch"; - private static final String ERROR_CODE_KEY_NAME = "error_code"; + + private static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + ERROR_CODE, + new Field(PRODUCER_ID_KEY_NAME, INT64, "The producer id for the input transactional id. If the input " + + "id was empty, then this is used only for ensuring idempotence of messages."), + new Field(EPOCH_KEY_NAME, INT16, "The epoch for the producer id. Will always be 0 if no transactional " + + "id was specified in the request.")); + + public static Schema[] schemaVersions() { + return new Schema[]{INIT_PRODUCER_ID_RESPONSE_V0}; + } + private final int throttleTimeMs; private final Errors error; private final long producerId; @@ -47,8 +66,8 @@ public class InitProducerIdResponse extends AbstractResponse { } public InitProducerIdResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); - this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + this.throttleTimeMs = struct.get(THROTTLE_TIME_MS); + this.error = Errors.forCode(struct.get(ERROR_CODE)); this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); this.epoch = struct.getShort(EPOCH_KEY_NAME); } @@ -76,10 +95,10 @@ public class InitProducerIdResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); struct.set(PRODUCER_ID_KEY_NAME, producerId); struct.set(EPOCH_KEY_NAME, epoch); - struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.set(ERROR_CODE, error.code()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index ff07d13..b2ff133 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.Utils; @@ -26,6 +29,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.apache.kafka.common.protocol.types.Type.BYTES; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class JoinGroupRequest extends AbstractRequest { private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout"; @@ -36,6 +43,38 @@ public class JoinGroupRequest extends AbstractRequest { private static final String PROTOCOL_NAME_KEY_NAME = "protocol_name"; private static final String PROTOCOL_METADATA_KEY_NAME = "protocol_metadata"; + /* Join group api */ + private static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema( + new Field(PROTOCOL_NAME_KEY_NAME, STRING), + new Field(PROTOCOL_METADATA_KEY_NAME, BYTES)); + + private static final Schema JOIN_GROUP_REQUEST_V0 = new Schema( + new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), + new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator considers the consumer dead if it receives " + + "no heartbeat after this timeout in ms."), + new Field(MEMBER_ID_KEY_NAME, STRING, "The assigned consumer id or an empty string for a new consumer."), + new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"), + new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " + + "that the member supports")); + + private static final Schema JOIN_GROUP_REQUEST_V1 = new Schema( + new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), + new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator considers the consumer dead if it receives no " + + "heartbeat after this timeout in ms."), + new Field(REBALANCE_TIMEOUT_KEY_NAME, INT32, "The maximum time that the coordinator will wait for each " + + "member to rejoin when rebalancing the group"), + new Field(MEMBER_ID_KEY_NAME, STRING, "The assigned consumer id or an empty string for a new consumer."), + new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"), + new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " + + "that the member supports")); + + /* v2 request is the same as v1. Throttle time has been added to response */ + private static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1; + + public static Schema[] schemaVersions() { + return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2}; + } + public static final String UNKNOWN_MEMBER_ID = ""; private final String groupId; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index eb86ce7..a4431b9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -26,9 +29,53 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.types.Type.BYTES; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class JoinGroupResponse extends AbstractResponse { - private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final String GENERATION_ID_KEY_NAME = "generation_id"; + private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol"; + private static final String LEADER_ID_KEY_NAME = "leader_id"; + private static final String MEMBER_ID_KEY_NAME = "member_id"; + private static final String MEMBERS_KEY_NAME = "members"; + + private static final String MEMBER_METADATA_KEY_NAME = "member_metadata"; + + private static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema( + new Field(MEMBER_ID_KEY_NAME, STRING), + new Field(MEMBER_METADATA_KEY_NAME, BYTES)); + + private static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema( + ERROR_CODE, + new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the consumer group."), + new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"), + new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"), + new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by the group coordinator."), + new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); + + private static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0; + + private static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema( + THROTTLE_TIME_MS, + ERROR_CODE, + new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the consumer group."), + new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"), + new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"), + new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by the group coordinator."), + new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); + + + public static Schema[] schemaVersions() { + return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2}; + } + + public static final String UNKNOWN_PROTOCOL = ""; + public static final int UNKNOWN_GENERATION_ID = -1; + public static final String UNKNOWN_MEMBER_ID = ""; /** * Possible error codes: @@ -42,18 +89,6 @@ public class JoinGroupResponse extends AbstractResponse { * GROUP_AUTHORIZATION_FAILED (30) */ - private static final String GENERATION_ID_KEY_NAME = "generation_id"; - private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol"; - private static final String LEADER_ID_KEY_NAME = "leader_id"; - private static final String MEMBER_ID_KEY_NAME = "member_id"; - private static final String MEMBERS_KEY_NAME = "members"; - - private static final String MEMBER_METADATA_KEY_NAME = "member_metadata"; - - public static final String UNKNOWN_PROTOCOL = ""; - public static final int UNKNOWN_GENERATION_ID = -1; - public static final String UNKNOWN_MEMBER_ID = ""; - private final int throttleTimeMs; private final Errors error; private final int generationId; @@ -88,7 +123,7 @@ public class JoinGroupResponse extends AbstractResponse { } public JoinGroupResponse(Struct struct) { - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); members = new HashMap<>(); for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) { @@ -97,7 +132,7 @@ public class JoinGroupResponse extends AbstractResponse { ByteBuffer memberMetadata = memberData.getBytes(MEMBER_METADATA_KEY_NAME); members.put(memberId, memberMetadata); } - error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + error = Errors.forCode(struct.get(ERROR_CODE)); generationId = struct.getInt(GENERATION_ID_KEY_NAME); groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME); memberId = struct.getString(MEMBER_ID_KEY_NAME); @@ -143,10 +178,9 @@ public class JoinGroupResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.JOIN_GROUP.responseSchema(version)); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); - struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.set(ERROR_CODE, error.code()); struct.set(GENERATION_ID_KEY_NAME, generationId); struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol); struct.set(MEMBER_ID_KEY_NAME, memberId); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index bd379b8..73f037f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -20,6 +20,9 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.Utils; @@ -31,6 +34,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class LeaderAndIsrRequest extends AbstractRequest { private static final String CONTROLLER_ID_KEY_NAME = "controller_id"; private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch"; @@ -38,8 +47,6 @@ public class LeaderAndIsrRequest extends AbstractRequest { private static final String LIVE_LEADERS_KEY_NAME = "live_leaders"; // partition_states key names - private static final String TOPIC_KEY_NAME = "topic"; - private static final String PARTITION_KEY_NAME = "partition"; private static final String LEADER_KEY_NAME = "leader"; private static final String LEADER_EPOCH_KEY_NAME = "leader_epoch"; private static final String ISR_KEY_NAME = "isr"; @@ -52,6 +59,51 @@ public class LeaderAndIsrRequest extends AbstractRequest { private static final String HOST_KEY_NAME = "host"; private static final String PORT_KEY_NAME = "port"; + private static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 = new Schema( + TOPIC_NAME, + PARTITION_ID, + new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."), + new Field(LEADER_KEY_NAME, INT32, "The broker id for the leader."), + new Field(LEADER_EPOCH_KEY_NAME, INT32, "The leader epoch."), + new Field(ISR_KEY_NAME, new ArrayOf(INT32), "The in sync replica ids."), + new Field(ZK_VERSION_KEY_NAME, INT32, "The ZK version."), + new Field(REPLICAS_KEY_NAME, new ArrayOf(INT32), "The replica ids.")); + + // LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1 added a per-partition is_new Field. + // This field specifies whether the replica should have existed on the broker or not. + private static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1 = new Schema( + TOPIC_NAME, + PARTITION_ID, + new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."), + new Field(LEADER_KEY_NAME, INT32, "The broker id for the leader."), + new Field(LEADER_EPOCH_KEY_NAME, INT32, "The leader epoch."), + new Field(ISR_KEY_NAME, new ArrayOf(INT32), "The in sync replica ids."), + new Field(ZK_VERSION_KEY_NAME, INT32, "The ZK version."), + new Field(REPLICAS_KEY_NAME, new ArrayOf(INT32), "The replica ids."), + new Field(IS_NEW_KEY_NAME, BOOLEAN, "Whether the replica should have existed on the broker or not")); + + private static final Schema LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0 = new Schema( + new Field(END_POINT_ID_KEY_NAME, INT32, "The broker id."), + new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."), + new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests.")); + + private static final Schema LEADER_AND_ISR_REQUEST_V0 = new Schema( + new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."), + new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."), + new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)), + new Field(LIVE_LEADERS_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0))); + + // LEADER_AND_ISR_REQUEST_V1 added a per-partition is_new Field. This field specifies whether the replica should have existed on the broker or not. + private static final Schema LEADER_AND_ISR_REQUEST_V1 = new Schema( + new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."), + new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."), + new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1)), + new Field(LIVE_LEADERS_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0))); + + public static Schema[] schemaVersions() { + return new Schema[]{LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1}; + } + public static class Builder extends AbstractRequest.Builder { private final int controllerId; private final int controllerEpoch; @@ -105,8 +157,8 @@ public class LeaderAndIsrRequest extends AbstractRequest { Map partitionStates = new HashMap<>(); for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) { Struct partitionStateData = (Struct) partitionStateDataObj; - String topic = partitionStateData.getString(TOPIC_KEY_NAME); - int partition = partitionStateData.getInt(PARTITION_KEY_NAME); + String topic = partitionStateData.get(TOPIC_NAME); + int partition = partitionStateData.get(PARTITION_ID); int controllerEpoch = partitionStateData.getInt(CONTROLLER_EPOCH_KEY_NAME); int leader = partitionStateData.getInt(LEADER_KEY_NAME); int leaderEpoch = partitionStateData.getInt(LEADER_EPOCH_KEY_NAME); @@ -154,8 +206,8 @@ public class LeaderAndIsrRequest extends AbstractRequest { for (Map.Entry entry : partitionStates.entrySet()) { Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME); TopicPartition topicPartition = entry.getKey(); - partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic()); - partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition()); + partitionStateData.set(TOPIC_NAME, topicPartition.topic()); + partitionStateData.set(PARTITION_ID, topicPartition.partition()); PartitionState partitionState = entry.getValue(); partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.basePartitionState.controllerEpoch); partitionStateData.set(LEADER_KEY_NAME, partitionState.basePartitionState.leader); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java index bc4400e..39b8c37 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java @@ -19,6 +19,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -27,14 +30,27 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class LeaderAndIsrResponse extends AbstractResponse { +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; - private static final String ERROR_CODE_KEY_NAME = "error_code"; +public class LeaderAndIsrResponse extends AbstractResponse { private static final String PARTITIONS_KEY_NAME = "partitions"; - private static final String PARTITIONS_TOPIC_KEY_NAME = "topic"; - private static final String PARTITIONS_PARTITION_KEY_NAME = "partition"; - private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code"; + private static final Schema LEADER_AND_ISR_RESPONSE_PARTITION_V0 = new Schema( + TOPIC_NAME, + PARTITION_ID, + ERROR_CODE); + private static final Schema LEADER_AND_ISR_RESPONSE_V0 = new Schema( + ERROR_CODE, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0))); + + // LeaderAndIsrResponse V1 may receive KAFKA_STORAGE_ERROR in the response + private static final Schema LEADER_AND_ISR_RESPONSE_V1 = LEADER_AND_ISR_RESPONSE_V0; + + public static Schema[] schemaVersions() { + return new Schema[]{LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1}; + } /** * Possible error code: @@ -54,13 +70,13 @@ public class LeaderAndIsrResponse extends AbstractResponse { responses = new HashMap<>(); for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) { Struct responseData = (Struct) responseDataObj; - String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME); - int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME); - Errors error = Errors.forCode(responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME)); + String topic = responseData.get(TOPIC_NAME); + int partition = responseData.get(PARTITION_ID); + Errors error = Errors.forCode(responseData.get(ERROR_CODE)); responses.put(new TopicPartition(topic, partition), error); } - error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + error = Errors.forCode(struct.get(ERROR_CODE)); } public Map responses() { @@ -83,14 +99,14 @@ public class LeaderAndIsrResponse extends AbstractResponse { for (Map.Entry response : responses.entrySet()) { Struct partitionData = struct.instance(PARTITIONS_KEY_NAME); TopicPartition partition = response.getKey(); - partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic()); - partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition()); - partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue().code()); + partitionData.set(TOPIC_NAME, partition.topic()); + partitionData.set(PARTITION_ID, partition.partition()); + partitionData.set(ERROR_CODE, response.getValue().code()); responseDatas.add(partitionData); } struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray()); - struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.set(ERROR_CODE, error.code()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java index 76e076e..661eb7f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java @@ -16,15 +16,31 @@ */ package org.apache.kafka.common.requests; -import java.nio.ByteBuffer; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; +import java.nio.ByteBuffer; + +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class LeaveGroupRequest extends AbstractRequest { private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String MEMBER_ID_KEY_NAME = "member_id"; + private static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema( + new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), + new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by the group coordinator.")); + + /* v1 request is the same as v0. Throttle time has been added to response */ + private static final Schema LEAVE_GROUP_REQUEST_V1 = LEAVE_GROUP_REQUEST_V0; + + public static Schema[] schemaVersions() { + return new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1}; + } + public static class Builder extends AbstractRequest.Builder { private final String groupId; private final String memberId; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java index 1c85850..bef21e9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java @@ -18,13 +18,25 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; + public class LeaveGroupResponse extends AbstractResponse { - private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema( + ERROR_CODE); + private static final Schema LEAVE_GROUP_RESPONSE_V1 = new Schema( + THROTTLE_TIME_MS, + ERROR_CODE); + + public static Schema[] schemaVersions() { + return new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1}; + } /** * Possible error code: @@ -48,8 +60,8 @@ public class LeaveGroupResponse extends AbstractResponse { } public LeaveGroupResponse(Struct struct) { - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; - error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + this.error = Errors.forCode(struct.get(ERROR_CODE)); } public int throttleTimeMs() { @@ -63,9 +75,8 @@ public class LeaveGroupResponse extends AbstractResponse { @Override public Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.LEAVE_GROUP.responseSchema(version)); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); - struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); + struct.set(ERROR_CODE, error.code()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java index 3d4f2b8..f279b4c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java @@ -18,12 +18,24 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; import java.util.Collections; public class ListGroupsRequest extends AbstractRequest { + + /* List groups api */ + private static final Schema LIST_GROUPS_REQUEST_V0 = new Schema(); + + /* v1 request is the same as v0. Throttle time has been added to response */ + private static final Schema LIST_GROUPS_REQUEST_V1 = LIST_GROUPS_REQUEST_V0; + + public static Schema[] schemaVersions() { + return new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1}; + } + public static class Builder extends AbstractRequest.Builder { public Builder() { super(ApiKeys.LIST_GROUPS); @@ -49,7 +61,7 @@ public class ListGroupsRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + public ListGroupsResponse getErrorResponse(int throttleTimeMs, Throwable e) { short versionId = version(); switch (versionId) { case 0: http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index 8ae3792..cdf4c59 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -18,19 +18,39 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class ListGroupsResponse extends AbstractResponse { - public static final String ERROR_CODE_KEY_NAME = "error_code"; - public static final String GROUPS_KEY_NAME = "groups"; - public static final String GROUP_ID_KEY_NAME = "group_id"; - public static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; + private static final String GROUPS_KEY_NAME = "groups"; + private static final String GROUP_ID_KEY_NAME = "group_id"; + private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; + + private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema( + new Field("group_id", STRING), + new Field("protocol_type", STRING)); + private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema( + ERROR_CODE, + new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); + private static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema( + THROTTLE_TIME_MS, + ERROR_CODE, + new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); + + public static Schema[] schemaVersions() { + return new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1}; + } /** * Possible error codes: @@ -54,8 +74,8 @@ public class ListGroupsResponse extends AbstractResponse { } public ListGroupsResponse(Struct struct) { - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; - this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + this.error = Errors.forCode(struct.get(ERROR_CODE)); this.groups = new ArrayList<>(); for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { Struct groupStruct = (Struct) groupObj; @@ -99,9 +119,8 @@ public class ListGroupsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.LIST_GROUPS.responseSchema(version)); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); - struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); + struct.set(ERROR_CODE, error.code()); List groupList = new ArrayList<>(); for (Group group : groups) { Struct groupStruct = struct.instance(GROUPS_KEY_NAME); @@ -113,14 +132,6 @@ public class ListGroupsResponse extends AbstractResponse { return struct; } - public static ListGroupsResponse fromError(Errors error) { - return fromError(DEFAULT_THROTTLE_TIME, error); - } - - public static ListGroupsResponse fromError(int throttleTimeMs, Errors error) { - return new ListGroupsResponse(throttleTimeMs, error, Collections.emptyList()); - } - public static ListGroupsResponse parse(ByteBuffer buffer, short version) { return new ListGroupsResponse(ApiKeys.LIST_GROUPS.parseResponse(version, buffer)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 03f6ee5..ace582d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -20,6 +20,9 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; @@ -32,6 +35,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.INT64; +import static org.apache.kafka.common.protocol.types.Type.INT8; + public class ListOffsetRequest extends AbstractRequest { public static final long EARLIEST_TIMESTAMP = -2L; public static final long LATEST_TIMESTAMP = -1L; @@ -44,14 +53,49 @@ public class ListOffsetRequest extends AbstractRequest { private static final String TOPICS_KEY_NAME = "topics"; // topic level field names - private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; // partition level field names - private static final String PARTITION_KEY_NAME = "partition"; private static final String TIMESTAMP_KEY_NAME = "timestamp"; private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets"; + private static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema( + PARTITION_ID, + new Field(TIMESTAMP_KEY_NAME, INT64, "Timestamp."), + new Field(MAX_NUM_OFFSETS_KEY_NAME, INT32, "Maximum offsets to return.")); + private static final Schema LIST_OFFSET_REQUEST_PARTITION_V1 = new Schema( + PARTITION_ID, + new Field(TIMESTAMP_KEY_NAME, INT64, "The target timestamp for the partition.")); + + private static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0), "Partitions to list offset.")); + private static final Schema LIST_OFFSET_REQUEST_TOPIC_V1 = new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V1), "Partitions to list offset.")); + + private static final Schema LIST_OFFSET_REQUEST_V0 = new Schema( + new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), + new Field(TOPICS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0), "Topics to list offsets.")); + private static final Schema LIST_OFFSET_REQUEST_V1 = new Schema( + new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), + new Field(TOPICS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1), "Topics to list offsets.")); + + private static final Schema LIST_OFFSET_REQUEST_V2 = new Schema( + new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), + new Field(ISOLATION_LEVEL_KEY_NAME, INT8, "This setting controls the visibility of transactional records. " + + "Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED " + + "(isolation_level = 1), non-transactional and COMMITTED transactional records are visible. " + + "To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current " + + "LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the " + + "result, which allows consumers to discard ABORTED transactional records"), + new Field(TOPICS_KEY_NAME, new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1), "Topics to list offsets."));; + + + public static Schema[] schemaVersions() { + return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2}; + } + private final int replicaId; private final IsolationLevel isolationLevel; private final Map offsetData; @@ -193,10 +237,10 @@ public class ListOffsetRequest extends AbstractRequest { partitionTimestamps = new HashMap<>(); for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.getString(TOPIC_KEY_NAME); + String topic = topicResponse.get(TOPIC_NAME); for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + int partition = partitionResponse.get(PARTITION_ID); long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); TopicPartition tp = new TopicPartition(topic, partition); if (partitionResponse.hasField(MAX_NUM_OFFSETS_KEY_NAME)) { @@ -283,20 +327,20 @@ public class ListOffsetRequest extends AbstractRequest { List topicArray = new ArrayList<>(); for (Map.Entry> topicEntry: topicsData.entrySet()) { Struct topicData = struct.instance(TOPICS_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); + topicData.set(TOPIC_NAME, topicEntry.getKey()); List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { if (version == 0) { PartitionData offsetPartitionData = (PartitionData) partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); - partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(PARTITION_ID, partitionEntry.getKey()); partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp); partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets); partitionArray.add(partitionData); } else { Long timestamp = (Long) partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); - partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(PARTITION_ID, partitionEntry.getKey()); partitionData.set(TIMESTAMP_KEY_NAME, timestamp); partitionArray.add(partitionData); }