KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Jun Rao <junrao@gmail.com>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes #2614 from hachikuji/exactly-once-message-format
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5bd06f1d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5bd06f1d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5bd06f1d
Branch: refs/heads/trunk
Commit: 5bd06f1d542e6b588a1d402d059bc24690017d32
Parents: f615c9e
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Mar 24 19:38:36 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Mar 24 19:38:43 2017 +0000
----------------------------------------------------------------------
checkstyle/checkstyle.xml | 2 +-
checkstyle/import-control.xml | 2 +
checkstyle/suppressions.xml | 6 +-
.../org/apache/kafka/clients/ApiVersions.java | 66 ++
.../org/apache/kafka/clients/KafkaClient.java | 6 +-
.../org/apache/kafka/clients/NetworkClient.java | 27 +-
.../apache/kafka/clients/NodeApiVersions.java | 126 ++--
.../kafka/clients/consumer/ConsumerRecord.java | 4 +-
.../kafka/clients/consumer/KafkaConsumer.java | 6 +-
.../clients/consumer/internals/Fetcher.java | 50 +-
.../kafka/clients/producer/KafkaProducer.java | 27 +-
.../kafka/clients/producer/MockProducer.java | 28 +-
.../kafka/clients/producer/RecordMetadata.java | 4 +-
.../internals/ProduceRequestResult.java | 12 +-
.../producer/internals/ProducerBatch.java | 12 +-
.../internals/ProducerInterceptors.java | 4 +-
.../producer/internals/RecordAccumulator.java | 19 +-
.../clients/producer/internals/Sender.java | 39 +-
.../apache/kafka/common/protocol/Protocol.java | 94 ++-
.../kafka/common/protocol/types/Type.java | 24 +-
.../record/AbstractLegacyRecordBatch.java | 456 ++++++++++++++
.../common/record/AbstractRecordBatch.java | 31 +
.../kafka/common/record/AbstractRecords.java | 185 ++++--
.../common/record/ByteBufferLogInputStream.java | 85 +--
.../kafka/common/record/CompressionType.java | 4 +-
.../kafka/common/record/ControlRecordType.java | 87 +++
.../kafka/common/record/DefaultRecord.java | 457 ++++++++++++++
.../kafka/common/record/DefaultRecordBatch.java | 435 +++++++++++++
.../kafka/common/record/FileLogInputStream.java | 250 ++++++--
.../apache/kafka/common/record/FileRecords.java | 129 ++--
.../org/apache/kafka/common/record/Header.java | 64 ++
.../kafka/common/record/LegacyRecord.java | 570 +++++++++++++++++
.../apache/kafka/common/record/LogEntry.java | 171 -----
.../kafka/common/record/LogInputStream.java | 24 +-
.../kafka/common/record/MemoryRecords.java | 277 ++++----
.../common/record/MemoryRecordsBuilder.java | 348 +++++++---
.../kafka/common/record/MutableRecordBatch.java | 45 ++
.../org/apache/kafka/common/record/Record.java | 630 ++-----------------
.../apache/kafka/common/record/RecordBatch.java | 203 ++++++
.../common/record/RecordBatchIterator.java | 43 ++
.../org/apache/kafka/common/record/Records.java | 72 ++-
.../kafka/common/record/RecordsIterator.java | 222 -------
.../kafka/common/record/SimpleRecord.java | 109 ++++
.../kafka/common/record/TimestampType.java | 13 -
.../kafka/common/requests/AbstractRequest.java | 4 +
.../kafka/common/requests/FetchRequest.java | 31 +-
.../kafka/common/requests/FetchResponse.java | 87 ++-
.../kafka/common/requests/IsolationLevel.java | 42 ++
.../kafka/common/requests/ProduceRequest.java | 86 ++-
.../kafka/common/requests/ProduceResponse.java | 5 +-
.../org/apache/kafka/common/utils/Crc32.java | 29 +
.../org/apache/kafka/common/utils/Utils.java | 89 ++-
.../apache/kafka/clients/ApiVersionsTest.java | 46 ++
.../org/apache/kafka/clients/MockClient.java | 9 +-
.../apache/kafka/clients/NetworkClientTest.java | 21 +-
.../kafka/clients/NodeApiVersionsTest.java | 30 +
.../clients/consumer/KafkaConsumerTest.java | 16 +-
.../internals/ConsumerCoordinatorTest.java | 2 +-
.../clients/consumer/internals/FetcherTest.java | 107 +++-
.../kafka/clients/producer/RecordSendTest.java | 12 +-
.../producer/internals/ProducerBatchTest.java | 6 +-
.../internals/RecordAccumulatorTest.java | 112 +++-
.../clients/producer/internals/SenderTest.java | 153 ++++-
.../record/AbstractLegacyRecordBatchTest.java | 167 +++++
.../record/ByteBufferLogInputStreamTest.java | 116 ++--
.../common/record/CompressionTypeTest.java | 8 +-
.../common/record/ControlRecordTypeTest.java | 48 ++
.../common/record/DefaultRecordBatchTest.java | 210 +++++++
.../kafka/common/record/DefaultRecordTest.java | 93 +++
.../common/record/FileLogInputStreamTest.java | 62 ++
.../kafka/common/record/FileRecordsTest.java | 229 +++----
.../kafka/common/record/LegacyRecordTest.java | 129 ++++
.../common/record/MemoryRecordsBuilderTest.java | 297 ++++++---
.../kafka/common/record/MemoryRecordsTest.java | 222 +++++--
.../apache/kafka/common/record/RecordTest.java | 129 ----
.../common/record/SimpleLegacyRecordTest.java | 87 +++
.../kafka/common/record/SimpleRecordTest.java | 144 -----
.../kafka/common/record/TimestampTypeTest.java | 42 --
.../common/requests/RequestResponseTest.java | 108 +++-
.../apache/kafka/common/utils/Crc32Test.java | 61 ++
.../apache/kafka/common/utils/UtilsTest.java | 92 +++
.../runtime/distributed/WorkerGroupMember.java | 4 +-
.../apache/kafka/connect/util/ConnectUtils.java | 4 +-
.../connect/runtime/WorkerSinkTaskTest.java | 6 +-
.../main/scala/kafka/admin/AdminClient.scala | 3 +-
core/src/main/scala/kafka/api/ApiVersion.scala | 33 +-
.../src/main/scala/kafka/api/FetchRequest.scala | 3 +-
.../kafka/consumer/ConsumerFetcherThread.scala | 2 +-
.../controller/ControllerChannelManager.scala | 5 +-
.../kafka/coordinator/GroupCoordinator.scala | 8 +-
.../coordinator/GroupMetadataManager.scala | 264 ++++----
core/src/main/scala/kafka/log/Log.scala | 63 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 34 +-
core/src/main/scala/kafka/log/LogSegment.scala | 35 +-
.../src/main/scala/kafka/log/LogValidator.scala | 259 ++++----
core/src/main/scala/kafka/log/TimeIndex.scala | 6 +-
.../kafka/message/ByteBufferMessageSet.scala | 32 +-
core/src/main/scala/kafka/message/Message.scala | 35 +-
.../scala/kafka/message/MessageAndOffset.scala | 14 +-
.../main/scala/kafka/message/MessageSet.scala | 2 +-
.../scala/kafka/network/RequestChannel.scala | 5 +-
.../kafka/server/AbstractFetcherThread.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 47 +-
.../main/scala/kafka/server/KafkaConfig.scala | 10 +-
.../main/scala/kafka/server/KafkaServer.scala | 5 +-
.../kafka/server/ReplicaFetcherThread.scala | 10 +-
.../scala/kafka/server/ReplicaManager.scala | 3 +-
.../scala/kafka/tools/DumpLogSegments.scala | 120 ++--
.../main/scala/kafka/tools/MirrorMaker.scala | 4 +-
.../kafka/tools/ReplicaVerificationTool.scala | 27 +-
.../admin/BrokerApiVersionsCommandTest.scala | 4 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 6 +-
.../api/GroupCoordinatorIntegrationTest.scala | 4 +-
.../kafka/api/PlaintextConsumerTest.scala | 6 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 4 +-
.../ReplicaFetcherThreadFatalErrorTest.scala | 3 +-
.../tools/ReplicaVerificationToolTest.scala | 6 +-
.../scala/kafka/tools/TestLogCleaning.scala | 10 +-
.../test/scala/other/kafka/StressTestLog.scala | 4 +-
.../other/kafka/TestLinearWriteSpeed.scala | 10 +-
.../GroupCoordinatorResponseTest.scala | 23 +-
.../coordinator/GroupMetadataManagerTest.scala | 65 +-
.../unit/kafka/log/BrokerCompressionTest.scala | 10 +-
.../kafka/log/LogCleanerIntegrationTest.scala | 48 +-
.../log/LogCleanerLagIntegrationTest.scala | 6 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 14 +-
.../scala/unit/kafka/log/LogCleanerTest.scala | 61 +-
.../scala/unit/kafka/log/LogSegmentTest.scala | 66 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 166 ++---
.../scala/unit/kafka/log/LogValidatorTest.scala | 510 +++++++++++----
.../kafka/message/BaseMessageSetTestCases.scala | 3 +-
.../unit/kafka/network/SocketServerTest.scala | 8 +-
.../unit/kafka/producer/SyncProducerTest.scala | 4 +-
.../server/AbstractFetcherThreadTest.scala | 30 +-
.../unit/kafka/server/EdgeCaseRequestTest.scala | 11 +-
.../unit/kafka/server/FetchRequestTest.scala | 45 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 13 +-
.../unit/kafka/server/ProduceRequestTest.scala | 21 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 24 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 12 +-
.../unit/kafka/server/SimpleFetchTest.scala | 53 +-
.../unit/kafka/tools/MirrorMakerTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 28 +-
docs/upgrade.html | 30 +-
.../processor/internals/StreamsKafkaClient.java | 4 +-
145 files changed, 7555 insertions(+), 3405 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 9a4a37f..b775025 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -98,7 +98,7 @@
<module name="MethodLength"/>
<module name="ParameterNumber">
<!-- default is 8 -->
- <property name="max" value="10"/>
+ <property name="max" value="12"/>
</module>
<module name="ClassDataAbstractionCoupling">
<!-- default is 7 -->
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 80747e1..3475062 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -95,6 +95,8 @@
<allow pkg="net.jpountz" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.network" />
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.protocol.types" />
<allow pkg="org.apache.kafka.common.errors" />
</subpackage>
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a39695f..f722aba 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -11,6 +11,8 @@
files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator).java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/protocol/Errors.java"/>
+ <suppress checks="ClassFanOutComplexity"
+ files=".*/common/utils/Utils.java"/>
<suppress checks="MethodLength"
files="KerberosLogin.java"/>
@@ -25,6 +27,8 @@
files="Fetcher.java"/>
<suppress checks="ParameterNumber"
files="ConfigDef.java"/>
+ <suppress checks="ParameterNumber"
+ files="DefaultRecordBatch.java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse).java"/>
@@ -79,7 +83,7 @@
<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
- files="(Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse)Test.java"/>
+ files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse)Test.java"/>
<suppress checks="ClassFanOutComplexity"
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher)Test.java"/>
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
new file mode 100644
index 0000000..9c61ff2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.ProduceRequest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Maintains node api versions for access outside of NetworkClient (which is where the information is derived).
+ * The pattern is akin to the use of {@link Metadata} for topic metadata.
+ *
+ * NOTE: This class is intended for INTERNAL usage only within Kafka.
+ */
+public class ApiVersions {
+
+ private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>();
+ private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
+
+ public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions) {
+ this.nodeApiVersions.put(nodeId, nodeApiVersions);
+ this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
+ }
+
+ public synchronized void remove(String nodeId) {
+ this.nodeApiVersions.remove(nodeId);
+ this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
+ }
+
+ public synchronized NodeApiVersions get(String nodeId) {
+ return this.nodeApiVersions.get(nodeId);
+ }
+
+ private byte computeMaxUsableProduceMagic() {
+ // use a magic version which is supported by all brokers to reduce the chance that
+ // we will need to convert the messages when they are ready to be sent.
+ byte maxUsableMagic = RecordBatch.CURRENT_MAGIC_VALUE;
+ for (NodeApiVersions versions : this.nodeApiVersions.values()) {
+ byte nodeMaxUsableMagic = ProduceRequest.requiredMagicForVersion(versions.usableVersion(ApiKeys.PRODUCE));
+ maxUsableMagic = (byte) Math.min(nodeMaxUsableMagic, maxUsableMagic);
+ }
+ return maxUsableMagic;
+ }
+
+ public synchronized byte maxUsableProduceMagic() {
+ return maxUsableProduceMagic;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 83a0009..9d63d43 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -16,12 +16,12 @@
*/
package org.apache.kafka.clients;
-import java.io.Closeable;
-import java.util.List;
-
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.AbstractRequest;
+import java.io.Closeable;
+import java.util.List;
+
/**
* The interface for {@link NetworkClient}
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index b6f8b0e..a279fe4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -42,7 +42,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -99,7 +98,7 @@ public class NetworkClient implements KafkaClient {
*/
private final boolean discoverBrokerVersions;
- private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>();
+ private final ApiVersions apiVersions;
private final Set<String> nodesNeedingApiVersionsFetch = new HashSet<>();
@@ -114,9 +113,10 @@ public class NetworkClient implements KafkaClient {
int socketReceiveBuffer,
int requestTimeoutMs,
Time time,
- boolean discoverBrokerVersions) {
- this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
- reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions);
+ boolean discoverBrokerVersions,
+ ApiVersions apiVersions) {
+ this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
+ socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions, apiVersions);
}
public NetworkClient(Selectable selector,
@@ -128,9 +128,10 @@ public class NetworkClient implements KafkaClient {
int socketReceiveBuffer,
int requestTimeoutMs,
Time time,
- boolean discoverBrokerVersions) {
+ boolean discoverBrokerVersions,
+ ApiVersions apiVersions) {
this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
- socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions);
+ socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions, apiVersions);
}
private NetworkClient(MetadataUpdater metadataUpdater,
@@ -143,7 +144,8 @@ public class NetworkClient implements KafkaClient {
int socketReceiveBuffer,
int requestTimeoutMs,
Time time,
- boolean discoverBrokerVersions) {
+ boolean discoverBrokerVersions,
+ ApiVersions apiVersions) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
@@ -167,6 +169,7 @@ public class NetworkClient implements KafkaClient {
this.reconnectBackoffMs = reconnectBackoffMs;
this.time = time;
this.discoverBrokerVersions = discoverBrokerVersions;
+ this.apiVersions = apiVersions;
}
/**
@@ -285,7 +288,7 @@ public class NetworkClient implements KafkaClient {
}
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
- NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
+ NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
@@ -296,7 +299,7 @@ public class NetworkClient implements KafkaClient {
log.trace("No version information found when sending message of type {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), nodeId, version);
} else {
- version = versionInfo.usableVersion(clientRequest.apiKey());
+ version = versionInfo.usableVersion(clientRequest.apiKey(), builder.desiredVersion());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
@@ -482,7 +485,7 @@ public class NetworkClient implements KafkaClient {
*/
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
connectionStates.disconnected(nodeId, now);
- nodeApiVersions.remove(nodeId);
+ apiVersions.remove(nodeId);
nodesNeedingApiVersionsFetch.remove(nodeId);
for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
log.trace("Cancelled request {} due to node {} being disconnected", request.request, nodeId);
@@ -568,7 +571,7 @@ public class NetworkClient implements KafkaClient {
return;
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.apiVersions());
- nodeApiVersions.put(node, nodeVersionInfo);
+ apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
if (log.isDebugEnabled()) {
log.debug("Recorded API versions for node {}: {}", node, nodeVersionInfo);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index f216c60..dc2e6d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -21,27 +21,24 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
import org.apache.kafka.common.utils.Utils;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.TreeMap;
/**
* An internal class which represents the API versions supported by a particular node.
*/
public class NodeApiVersions {
- private static final short NODE_TOO_OLD = (short) -1;
- private static final short NODE_TOO_NEW = (short) -2;
- private final Collection<ApiVersion> nodeApiVersions;
+ // A map of the usable versions of each API, keyed by the ApiKeys instance
+ private final Map<ApiKeys, UsableVersion> usableVersions = new EnumMap<>(ApiKeys.class);
- /**
- * An array of the usable versions of each API, indexed by the ApiKeys ID.
- */
- private final Map<ApiKeys, Short> usableVersions = new EnumMap<>(ApiKeys.class);
+ // List of APIs which the broker supports, but which are unknown to the client
+ private final List<ApiVersion> unknownApis = new ArrayList<>();
/**
* Create a NodeApiVersions object with the current ApiVersions.
@@ -77,19 +74,13 @@ public class NodeApiVersions {
}
public NodeApiVersions(Collection<ApiVersion> nodeApiVersions) {
- this.nodeApiVersions = nodeApiVersions;
for (ApiVersion nodeApiVersion : nodeApiVersions) {
- // Newer brokers may support ApiKeys we don't know about, ignore them
if (ApiKeys.hasId(nodeApiVersion.apiKey)) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey);
- short v = Utils.min(nodeApiKey.latestVersion(), nodeApiVersion.maxVersion);
- if (v < nodeApiVersion.minVersion) {
- usableVersions.put(nodeApiKey, NODE_TOO_NEW);
- } else if (v < nodeApiKey.oldestVersion()) {
- usableVersions.put(nodeApiKey, NODE_TOO_OLD);
- } else {
- usableVersions.put(nodeApiKey, v);
- }
+ usableVersions.put(nodeApiKey, new UsableVersion(nodeApiKey, nodeApiVersion));
+ } else {
+ // Newer brokers may support ApiKeys we don't know about
+ unknownApis.add(nodeApiVersion);
}
}
}
@@ -98,22 +89,29 @@ public class NodeApiVersions {
* Return the most recent version supported by both the node and the local software.
*/
public short usableVersion(ApiKeys apiKey) {
- Short usableVersion = usableVersions.get(apiKey);
+ return usableVersion(apiKey, null);
+ }
+
+ /**
+ * Return the desired version (if usable) or the latest usable version if the desired version is null.
+ */
+ public short usableVersion(ApiKeys apiKey, Short desiredVersion) {
+ UsableVersion usableVersion = usableVersions.get(apiKey);
if (usableVersion == null)
throw new UnsupportedVersionException("The broker does not support " + apiKey);
- else if (usableVersion == NODE_TOO_OLD)
- throw new UnsupportedVersionException("The broker is too old to support " + apiKey +
- " version " + apiKey.oldestVersion());
- else if (usableVersion == NODE_TOO_NEW)
- throw new UnsupportedVersionException("The broker is too new to support " + apiKey +
- " version " + apiKey.latestVersion());
- else
- return usableVersion;
+
+ if (desiredVersion == null) {
+ usableVersion.ensureUsable();
+ return usableVersion.value;
+ } else {
+ usableVersion.ensureUsable(desiredVersion);
+ return desiredVersion;
+ }
}
/**
* Convert the object to a string with no linebreaks.<p/>
- *
+ * <p>
* This toString method is relatively expensive, so avoid calling it unless debug logging is turned on.
*/
@Override
@@ -131,7 +129,9 @@ public class NodeApiVersions {
// a TreeMap before printing it out to ensure that we always print in
// ascending order.
TreeMap<Short, String> apiKeysText = new TreeMap<>();
- for (ApiVersion apiVersion : this.nodeApiVersions)
+ for (UsableVersion usableVersion : this.usableVersions.values())
+ apiKeysText.put(usableVersion.apiVersion.apiKey, apiVersionToText(usableVersion.apiVersion));
+ for (ApiVersion apiVersion : unknownApis)
apiKeysText.put(apiVersion.apiKey, apiVersionToText(apiVersion));
// Also handle the case where some apiKey types are not specified at all in the given ApiVersions,
@@ -173,23 +173,75 @@ public class NodeApiVersions {
}
if (apiKey != null) {
- Short usableVersion = usableVersions.get(apiKey);
- if (usableVersion == NODE_TOO_OLD)
+ UsableVersion usableVersion = usableVersions.get(apiKey);
+ if (usableVersion.isTooOld())
bld.append(" [unusable: node too old]");
- else if (usableVersion == NODE_TOO_NEW)
+ else if (usableVersion.isTooNew())
bld.append(" [unusable: node too new]");
else
- bld.append(" [usable: ").append(usableVersion).append("]");
+ bld.append(" [usable: ").append(usableVersion.value).append("]");
}
return bld.toString();
}
+ /**
+ * Get the version information for a given API.
+ *
+ * @param apiKey The api key to lookup
+ * @return The api version information from the broker or null if it is unsupported
+ */
public ApiVersion apiVersion(ApiKeys apiKey) {
- for (ApiVersion nodeApiVersion : nodeApiVersions) {
- if (nodeApiVersion.apiKey == apiKey.id) {
- return nodeApiVersion;
+ UsableVersion usableVersion = usableVersions.get(apiKey);
+ if (usableVersion == null)
+ return null;
+ return usableVersion.apiVersion;
+ }
+
+ private static class UsableVersion {
+ private static final short NODE_TOO_OLD = (short) -1;
+ private static final short NODE_TOO_NEW = (short) -2;
+
+ private final ApiKeys apiKey;
+ private final ApiVersion apiVersion;
+ private final Short value;
+
+ private UsableVersion(ApiKeys apiKey, ApiVersion nodeApiVersion) {
+ this.apiKey = apiKey;
+ this.apiVersion = nodeApiVersion;
+ short v = Utils.min(apiKey.latestVersion(), nodeApiVersion.maxVersion);
+ if (v < nodeApiVersion.minVersion) {
+ this.value = NODE_TOO_NEW;
+ } else if (v < apiKey.oldestVersion()) {
+ this.value = NODE_TOO_OLD;
+ } else {
+ this.value = v;
}
}
- throw new NoSuchElementException();
+
+ private boolean isTooOld() {
+ return value == NODE_TOO_OLD;
+ }
+
+ private boolean isTooNew() {
+ return value == NODE_TOO_NEW;
+ }
+
+ private void ensureUsable() {
+ if (value == NODE_TOO_OLD)
+ throw new UnsupportedVersionException("The broker is too old to support " + apiKey +
+ " version " + apiKey.oldestVersion());
+ else if (value == NODE_TOO_NEW)
+ throw new UnsupportedVersionException("The broker is too new to support " + apiKey +
+ " version " + apiKey.latestVersion());
+ }
+
+ private void ensureUsable(short desiredVersion) {
+ if (apiVersion.minVersion > desiredVersion || apiVersion.maxVersion < desiredVersion)
+ throw new UnsupportedVersionException("The broker does not support the requested version " + desiredVersion +
+ " for api " + apiKey + ". Supported versions are " + apiVersion.minVersion +
+ " to " + apiVersion.maxVersion + ".");
+ }
+
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 26c3768..fe9ede8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.clients.consumer;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
/**
@@ -25,7 +25,7 @@ import org.apache.kafka.common.record.TimestampType;
* to the record in a Kafka partition, and a timestamp as marked by the corresponding ProducerRecord.
*/
public class ConsumerRecord<K, V> {
- public static final long NO_TIMESTAMP = Record.NO_TIMESTAMP;
+ public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 612f446..3894b5f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
@@ -663,13 +664,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
this.metadata,
clientId,
- 100, // a fixed large enough value will suffice
+ 100, // a fixed large enough value will suffice for max in-flight requests
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
time,
- true);
+ true,
+ new ApiVersions());
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 441206a..7236653 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -45,7 +45,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.InvalidRecordException;
-import org.apache.kafka.common.record.LogEntry;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
@@ -769,21 +769,34 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
boolean skippedRecords = false;
- for (LogEntry logEntry : partition.records.deepEntries()) {
- // Skip the messages earlier than current position.
- if (logEntry.offset() >= position) {
- parsed.add(parseRecord(tp, logEntry));
- bytes += logEntry.sizeInBytes();
- } else
- skippedRecords = true;
+ for (RecordBatch batch : partition.records.batches()) {
+ if (this.checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
+ try {
+ batch.ensureValid();
+ } catch (InvalidRecordException e) {
+ throw new KafkaException("Record batch for partition " + partition + " at offset " +
+ batch.baseOffset() + " is invalid, cause: " + e.getMessage());
+ }
+ }
+
+ for (Record record : batch) {
+ // control records should not be returned to the user. also skip anything out of range
+ if (record.isControlRecord() || record.offset() < position) {
+ skippedRecords = true;
+ } else {
+ parsed.add(parseRecord(tp, batch, record));
+ bytes += record.sizeInBytes();
+ }
+ }
}
recordsCount = parsed.size();
- log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
+ log.trace("Adding {} fetched record(s) for partition {} with offset {} to buffered record list",
+ parsed.size(), tp, position);
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
- if (parsed.isEmpty() && !skippedRecords && (partition.records.sizeInBytes() > 0)) {
+ if (parsed.isEmpty() && !skippedRecords && partition.records.sizeInBytes() > 0) {
if (completedFetch.responseVersion < 3) {
// Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
@@ -842,25 +855,22 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
return parsedRecords;
}
- /**
- * Parse the record entry, deserializing the key / value fields if necessary
- */
- private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
- Record record = logEntry.record();
-
+ private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
+ RecordBatch batch,
+ Record record) {
if (this.checkCrcs) {
try {
record.ensureValid();
} catch (InvalidRecordException e) {
- throw new KafkaException("Record for partition " + partition + " at offset " + logEntry.offset()
+ throw new KafkaException("Record for partition " + partition + " at offset " + record.offset()
+ " is invalid, cause: " + e.getMessage());
}
}
try {
- long offset = logEntry.offset();
+ long offset = record.offset();
long timestamp = record.timestamp();
- TimestampType timestampType = record.timestampType();
+ TimestampType timestampType = batch.timestampType();
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
@@ -875,7 +885,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
key, value);
} catch (RuntimeException e) {
throw new SerializationException("Error deserializing key/value for partition " + partition +
- " at offset " + logEntry.offset(), e);
+ " at offset " + record.offset(), e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 33da0c4..78dd668 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
@@ -43,9 +44,9 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
@@ -157,6 +158,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final long maxBlockTimeMs;
private final int requestTimeoutMs;
private final ProducerInterceptors<K, V> interceptors;
+ private final ApiVersions apiVersions;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -296,19 +298,22 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
}
+ this.apiVersions = new ApiVersions();
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
- time);
+ time,
+ apiVersions);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
NetworkClient client = new NetworkClient(
- new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
+ new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+ this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
@@ -317,7 +322,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs,
time,
- true);
+ true,
+ apiVersions);
this.sender = new Sender(client,
this.metadata,
this.accumulator,
@@ -327,7 +333,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
Time.SYSTEM,
- this.requestTimeoutMs);
+ this.requestTimeoutMs,
+ apiVersions);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
@@ -472,14 +479,16 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
int partition = partition(record, serializedKey, serializedValue, cluster);
- int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
+ int serializedSize = AbstractRecords.sizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
+ serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
- RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
+ RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
+ serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
@@ -824,7 +833,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (this.interceptors != null) {
if (metadata == null) {
- this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, Record.NO_TIMESTAMP, -1, -1, -1),
+ this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1, -1),
exception);
} else {
this.interceptors.onAcknowledgement(metadata, exception);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 35f5d94..22797b6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -16,6 +16,17 @@
*/
package org.apache.kafka.clients.producer;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
+import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.serialization.Serializer;
+
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
@@ -26,17 +37,6 @@ import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.serialization.Serializer;
-
/**
* A mock of the producer interface you can use for testing code that uses Kafka.
@@ -118,10 +118,10 @@ public class MockProducer<K, V> implements Producer<K, V> {
partition = partition(record, this.cluster);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
ProduceRequestResult result = new ProduceRequestResult(topicPartition);
- FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP, 0, 0, 0);
+ FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0, 0, 0);
long offset = nextOffset(topicPartition);
Completion completion = new Completion(offset,
- new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP, 0, 0, 0),
+ new RecordMetadata(topicPartition, 0, offset, RecordBatch.NO_TIMESTAMP, 0, 0, 0),
result, callback);
this.sent.add(record);
if (autoComplete)
@@ -245,7 +245,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
}
public void complete(RuntimeException e) {
- result.set(e == null ? offset : -1L, Record.NO_TIMESTAMP, e);
+ result.set(e == null ? offset : -1L, RecordBatch.NO_TIMESTAMP, e);
if (callback != null) {
if (e == null)
callback.onCompletion(metadata, null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index db34487..2d06ea8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -17,7 +17,7 @@
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
/**
* The metadata for a record that has been acknowledged by the server
@@ -54,7 +54,7 @@ public final class RecordMetadata {
@Deprecated
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) {
- this(topicPartition, baseOffset, relativeOffset, Record.NO_TIMESTAMP, -1, -1, -1);
+ this(topicPartition, baseOffset, relativeOffset, RecordBatch.NO_TIMESTAMP, -1, -1, -1);
}
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset,
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
index 0a73c41..fbfef61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -16,12 +16,12 @@
*/
package org.apache.kafka.clients.producer.internals;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
@@ -35,7 +35,7 @@ public final class ProduceRequestResult {
private final TopicPartition topicPartition;
private volatile Long baseOffset = null;
- private volatile long logAppendTime = Record.NO_TIMESTAMP;
+ private volatile long logAppendTime = RecordBatch.NO_TIMESTAMP;
private volatile RuntimeException error;
/**
@@ -97,7 +97,7 @@ public final class ProduceRequestResult {
* Return true if log append time is being used for this topic
*/
public boolean hasLogAppendTime() {
- return logAppendTime != Record.NO_TIMESTAMP;
+ return logAppendTime != RecordBatch.NO_TIMESTAMP;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 46273a1..2b90f81 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -20,9 +20,10 @@ import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,11 +74,11 @@ public final class ProducerBatch {
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
- if (!recordsBuilder.hasRoomFor(key, value)) {
+ if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
return null;
} else {
long checksum = this.recordsBuilder.append(timestamp, key, value);
- this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
+ this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
@@ -173,7 +174,7 @@ public final class ProducerBatch {
void expirationDone() {
if (expiryErrorMessage == null)
throw new IllegalStateException("Batch has not expired");
- this.done(-1L, Record.NO_TIMESTAMP,
+ this.done(-1L, RecordBatch.NO_TIMESTAMP,
new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
}
@@ -243,4 +244,7 @@ public final class ProducerBatch {
return !recordsBuilder.isClosed();
}
+ public byte magic() {
+ return recordsBuilder.magic();
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index da3be01..e4ab4c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,7 +113,7 @@ public class ProducerInterceptors<K, V> implements Closeable {
interceptTopicPartition = new TopicPartition(record.topic(),
record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
}
- interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, Record.NO_TIMESTAMP, -1, -1, -1),
+ interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1, -1),
exception);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 5d95f53..1e495d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
@@ -27,11 +28,11 @@ import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time;
@@ -73,6 +74,7 @@ public final class RecordAccumulator {
private final long retryBackoffMs;
private final BufferPool free;
private final Time time;
+ private final ApiVersions apiVersions;
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
private final IncompleteBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
@@ -92,6 +94,7 @@ public final class RecordAccumulator {
* exhausting all retries in a short period of time.
* @param metrics The metrics
* @param time The time instance to use
+ * @param apiVersions Request API versions for current connected brokers
*/
public RecordAccumulator(int batchSize,
long totalSize,
@@ -99,7 +102,8 @@ public final class RecordAccumulator {
long lingerMs,
long retryBackoffMs,
Metrics metrics,
- Time time) {
+ Time time,
+ ApiVersions apiVersions) {
this.drainIndex = 0;
this.closed = false;
this.flushesInProgress = new AtomicInteger(0);
@@ -114,6 +118,7 @@ public final class RecordAccumulator {
this.incomplete = new IncompleteBatches();
this.muted = new HashSet<>();
this.time = time;
+ this.apiVersions = apiVersions;
registerMetrics(metrics, metricGrpName);
}
@@ -182,7 +187,8 @@ public final class RecordAccumulator {
}
// we don't have an in-progress record batch try to allocate a new batch
- int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
+ byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
+ int size = Math.max(this.batchSize, AbstractRecords.sizeInBytesUpperBound(maxUsableMagic, key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
@@ -196,7 +202,8 @@ public final class RecordAccumulator {
return appendResult;
}
- MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize);
+ MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, maxUsableMagic, compression,
+ TimestampType.CREATE_TIME, this.batchSize);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
@@ -526,7 +533,7 @@ public final class RecordAccumulator {
batch.close();
dq.remove(batch);
}
- batch.done(-1L, Record.NO_TIMESTAMP, new IllegalStateException("Producer is closed forcefully."));
+ batch.done(-1L, RecordBatch.NO_TIMESTAMP, new IllegalStateException("Producer is closed forcefully."));
deallocate(batch);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 3604f68..a7394e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
@@ -95,6 +96,9 @@ public class Sender implements Runnable {
/* the max time to wait for the server to respond to the request*/
private final int requestTimeout;
+ /* current request API versions supported by the known brokers */
+ private final ApiVersions apiVersions;
+
public Sender(KafkaClient client,
Metadata metadata,
RecordAccumulator accumulator,
@@ -104,7 +108,8 @@ public class Sender implements Runnable {
int retries,
Metrics metrics,
Time time,
- int requestTimeout) {
+ int requestTimeout,
+ ApiVersions apiVersions) {
this.client = client;
this.accumulator = accumulator;
this.metadata = metadata;
@@ -116,6 +121,7 @@ public class Sender implements Runnable {
this.time = time;
this.sensors = new SenderMetrics(metrics);
this.requestTimeout = requestTimeout;
+ this.apiVersions = apiVersions;
}
/**
@@ -162,8 +168,7 @@ public class Sender implements Runnable {
/**
* Run a single iteration of sending
*
- * @param now
- * The current POSIX time in milliseconds
+ * @param now The current POSIX time in milliseconds
*/
void run(long now) {
Cluster cluster = metadata.fetch();
@@ -347,16 +352,38 @@ public class Sender implements Runnable {
* Create a produce request from the given record batches
*/
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
+ if (batches.isEmpty())
+ return;
+
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
+
+ // find the minimum magic version used when creating the record sets
+ byte minUsedMagic = apiVersions.maxUsableProduceMagic();
+ for (ProducerBatch batch : batches) {
+ if (batch.magic() < minUsedMagic)
+ minUsedMagic = batch.magic();
+ }
+
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
- produceRecordsByPartition.put(tp, batch.records());
+ MemoryRecords records = batch.records();
+
+ // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
+ // that the producer starts building the batch and the time that we send the request, and we may have
+ // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
+ // the new message format, but found that the broker didn't support it, so we need to down-convert on the
+ // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
+ // not all support the same message format version. For example, if a partition migrates from a broker
+ // which is supporting the new magic version to one which doesn't, then we will need to convert.
+ if (!records.hasMatchingMagic(minUsedMagic))
+ records = batch.records().downConvert(minUsedMagic);
+ produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
- ProduceRequest.Builder requestBuilder =
- new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
+ ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(minUsedMagic, acks, timeout,
+ produceRecordsByPartition);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 3343133..8c3e08c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -31,6 +31,7 @@ import static org.apache.kafka.common.protocol.types.Type.BYTES;
import static org.apache.kafka.common.protocol.types.Type.INT16;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
import static org.apache.kafka.common.protocol.types.Type.RECORDS;
import static org.apache.kafka.common.protocol.types.Type.STRING;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
@@ -165,6 +166,21 @@ public class Protocol {
*/
public static final Schema PRODUCE_REQUEST_V2 = PRODUCE_REQUEST_V1;
+ // Produce request V3 adds the transactional id which is used for authorization when attempting to write
+ // transactional data. This version also adds support for message format V2.
+ public static final Schema PRODUCE_REQUEST_V3 = new Schema(
+ new Field("transactional_id",
+ NULLABLE_STRING,
+ "The transactional ID of the producer. This is used to authorize transaction produce requests. " +
+ "This can be null for non-transactional producers."),
+ new Field("acks",
+ INT16,
+ "The number of acknowledgments the producer requires the leader to have received before " +
+ "considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader " +
+ "and -1 for the full ISR."),
+ new Field("timeout", INT32, "The time to await a response in ms."),
+ new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
new ArrayOf(new Schema(new Field("topic", STRING),
new Field("partition_responses",
@@ -204,8 +220,10 @@ public class Protocol {
"Duration in milliseconds for which the request was throttled" +
" due to quota violation. (Zero if the request did not violate any quota.)",
0));
- public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2};
- public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2};
+ public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
+
+ public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};
+ public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3};
/* Offset commit api */
public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -499,7 +517,7 @@ public class Protocol {
// Only the version number is incremented to indicate the client support message format V1 which uses
// relative offset and has timestamp.
public static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1;
- // FETCH_REQUEST_V3 added top level max_bytes field - the total size of partition data to accumulate in response.
+ // Fetch Request V3 added top level max_bytes field - the total size of partition data to accumulate in response.
// The partition ordering is now relevant - partitions will be processed in order they appear in request.
public static final Schema FETCH_REQUEST_V3 = new Schema(new Field("replica_id",
INT32,
@@ -519,6 +537,34 @@ public class Protocol {
new ArrayOf(FETCH_REQUEST_TOPIC_V0),
"Topics to fetch in the order provided."));
+ // The V4 Fetch Request adds the fetch isolation level and exposes magic v2 (via the response).
+ public static final Schema FETCH_REQUEST_V4 = new Schema(
+ new Field("replica_id",
+ INT32,
+ "Broker id of the follower. For normal consumers, use -1."),
+ new Field("max_wait_time",
+ INT32,
+ "Maximum time in ms to wait for the response."),
+ new Field("min_bytes",
+ INT32,
+ "Minimum bytes to accumulate in the response."),
+ new Field("max_bytes",
+ INT32,
+ "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
+ "if the first message in the first non-empty partition of the fetch is larger than this " +
+ "value, the message will still be returned to ensure that progress can be made."),
+ new Field("isolation_level",
+ INT8,
+ "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
+ "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
+ "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
+ "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
+ "and enables the inclusion of the list of aborted transactions in the result, which allows " +
+ "consumers to discard ABORTED transactional records"),
+ new Field("topics",
+ new ArrayOf(FETCH_REQUEST_TOPIC_V0),
+ "Topics to fetch in the order provided."));
+
public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
@@ -549,8 +595,46 @@ public class Protocol {
public static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
public static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
- public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3};
- public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3};
+
+ // The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
+ // last stable offset). It also exposes messages with magic v2 (along with older formats).
+ private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
+ new Field("pid", INT64, "The producer ID (PID) associated with the aborted transactions"),
+ new Field("first_offset", INT64, "The first offset in the aborted transaction"));
+
+ public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema(
+ new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("error_code", INT16),
+ new Field("high_watermark",
+ INT64,
+ "Last committed offset."),
+ new Field("last_stable_offset",
+ INT64,
+ "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
+ "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
+ new Field("aborted_transactions",
+ ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
+
+ public static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
+ new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V4),
+ new Field("record_set", RECORDS));
+
+ public static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
+ new Field("topic", STRING),
+ new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
+
+ public static final Schema FETCH_RESPONSE_V4 = new Schema(
+ new Field("throttle_time_ms",
+ INT32,
+ "Duration in milliseconds for which the request was throttled " +
+ "due to quota violation (zero if the request did not violate any quota).",
+ 0),
+ new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
+
+ public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4};
+ public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4};
/* List groups api */
public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 39e46fd..ffca09c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -16,14 +16,14 @@
*/
package org.apache.kafka.common.protocol.types;
-import java.nio.ByteBuffer;
-
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Utils;
+import java.nio.ByteBuffer;
+
/**
* A serializable type
*/
@@ -235,16 +235,15 @@ public abstract class Type {
}
@Override
- public Object read(ByteBuffer buffer) {
+ public String read(ByteBuffer buffer) {
short length = buffer.getShort();
if (length < 0)
throw new SchemaException("String length " + length + " cannot be negative");
if (length > buffer.remaining())
throw new SchemaException("Error reading string of length " + length + ", only " + buffer.remaining() + " bytes available");
-
- byte[] bytes = new byte[length];
- buffer.get(bytes);
- return Utils.utf8(bytes);
+ String result = Utils.utf8(buffer, length);
+ buffer.position(buffer.position() + length);
+ return result;
}
@Override
@@ -287,16 +286,15 @@ public abstract class Type {
}
@Override
- public Object read(ByteBuffer buffer) {
+ public String read(ByteBuffer buffer) {
short length = buffer.getShort();
if (length < 0)
return null;
if (length > buffer.remaining())
throw new SchemaException("Error reading string of length " + length + ", only " + buffer.remaining() + " bytes available");
-
- byte[] bytes = new byte[length];
- buffer.get(bytes);
- return Utils.utf8(bytes);
+ String result = Utils.utf8(buffer, length);
+ buffer.position(buffer.position() + length);
+ return result;
}
@Override
@@ -443,7 +441,7 @@ public abstract class Type {
}
@Override
- public Object read(ByteBuffer buffer) {
+ public Records read(ByteBuffer buffer) {
ByteBuffer recordsBuffer = (ByteBuffer) NULLABLE_BYTES.read(buffer);
return MemoryRecords.readableRecords(recordsBuffer);
}
|