kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [11/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)
Date Fri, 24 Mar 2017 19:44:04 GMT
KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Jun Rao <junrao@gmail.com>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2614 from hachikuji/exactly-once-message-format


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5bd06f1d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5bd06f1d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5bd06f1d

Branch: refs/heads/trunk
Commit: 5bd06f1d542e6b588a1d402d059bc24690017d32
Parents: f615c9e
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Mar 24 19:38:36 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Mar 24 19:38:43 2017 +0000

----------------------------------------------------------------------
 checkstyle/checkstyle.xml                       |   2 +-
 checkstyle/import-control.xml                   |   2 +
 checkstyle/suppressions.xml                     |   6 +-
 .../org/apache/kafka/clients/ApiVersions.java   |  66 ++
 .../org/apache/kafka/clients/KafkaClient.java   |   6 +-
 .../org/apache/kafka/clients/NetworkClient.java |  27 +-
 .../apache/kafka/clients/NodeApiVersions.java   | 126 ++--
 .../kafka/clients/consumer/ConsumerRecord.java  |   4 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |   6 +-
 .../clients/consumer/internals/Fetcher.java     |  50 +-
 .../kafka/clients/producer/KafkaProducer.java   |  27 +-
 .../kafka/clients/producer/MockProducer.java    |  28 +-
 .../kafka/clients/producer/RecordMetadata.java  |   4 +-
 .../internals/ProduceRequestResult.java         |  12 +-
 .../producer/internals/ProducerBatch.java       |  12 +-
 .../internals/ProducerInterceptors.java         |   4 +-
 .../producer/internals/RecordAccumulator.java   |  19 +-
 .../clients/producer/internals/Sender.java      |  39 +-
 .../apache/kafka/common/protocol/Protocol.java  |  94 ++-
 .../kafka/common/protocol/types/Type.java       |  24 +-
 .../record/AbstractLegacyRecordBatch.java       | 456 ++++++++++++++
 .../common/record/AbstractRecordBatch.java      |  31 +
 .../kafka/common/record/AbstractRecords.java    | 185 ++++--
 .../common/record/ByteBufferLogInputStream.java |  85 +--
 .../kafka/common/record/CompressionType.java    |   4 +-
 .../kafka/common/record/ControlRecordType.java  |  87 +++
 .../kafka/common/record/DefaultRecord.java      | 457 ++++++++++++++
 .../kafka/common/record/DefaultRecordBatch.java | 435 +++++++++++++
 .../kafka/common/record/FileLogInputStream.java | 250 ++++++--
 .../apache/kafka/common/record/FileRecords.java | 129 ++--
 .../org/apache/kafka/common/record/Header.java  |  64 ++
 .../kafka/common/record/LegacyRecord.java       | 570 +++++++++++++++++
 .../apache/kafka/common/record/LogEntry.java    | 171 -----
 .../kafka/common/record/LogInputStream.java     |  24 +-
 .../kafka/common/record/MemoryRecords.java      | 277 ++++----
 .../common/record/MemoryRecordsBuilder.java     | 348 +++++++---
 .../kafka/common/record/MutableRecordBatch.java |  45 ++
 .../org/apache/kafka/common/record/Record.java  | 630 ++-----------------
 .../apache/kafka/common/record/RecordBatch.java | 203 ++++++
 .../common/record/RecordBatchIterator.java      |  43 ++
 .../org/apache/kafka/common/record/Records.java |  72 ++-
 .../kafka/common/record/RecordsIterator.java    | 222 -------
 .../kafka/common/record/SimpleRecord.java       | 109 ++++
 .../kafka/common/record/TimestampType.java      |  13 -
 .../kafka/common/requests/AbstractRequest.java  |   4 +
 .../kafka/common/requests/FetchRequest.java     |  31 +-
 .../kafka/common/requests/FetchResponse.java    |  87 ++-
 .../kafka/common/requests/IsolationLevel.java   |  42 ++
 .../kafka/common/requests/ProduceRequest.java   |  86 ++-
 .../kafka/common/requests/ProduceResponse.java  |   5 +-
 .../org/apache/kafka/common/utils/Crc32.java    |  29 +
 .../org/apache/kafka/common/utils/Utils.java    |  89 ++-
 .../apache/kafka/clients/ApiVersionsTest.java   |  46 ++
 .../org/apache/kafka/clients/MockClient.java    |   9 +-
 .../apache/kafka/clients/NetworkClientTest.java |  21 +-
 .../kafka/clients/NodeApiVersionsTest.java      |  30 +
 .../clients/consumer/KafkaConsumerTest.java     |  16 +-
 .../internals/ConsumerCoordinatorTest.java      |   2 +-
 .../clients/consumer/internals/FetcherTest.java | 107 +++-
 .../kafka/clients/producer/RecordSendTest.java  |  12 +-
 .../producer/internals/ProducerBatchTest.java   |   6 +-
 .../internals/RecordAccumulatorTest.java        | 112 +++-
 .../clients/producer/internals/SenderTest.java  | 153 ++++-
 .../record/AbstractLegacyRecordBatchTest.java   | 167 +++++
 .../record/ByteBufferLogInputStreamTest.java    | 116 ++--
 .../common/record/CompressionTypeTest.java      |   8 +-
 .../common/record/ControlRecordTypeTest.java    |  48 ++
 .../common/record/DefaultRecordBatchTest.java   | 210 +++++++
 .../kafka/common/record/DefaultRecordTest.java  |  93 +++
 .../common/record/FileLogInputStreamTest.java   |  62 ++
 .../kafka/common/record/FileRecordsTest.java    | 229 +++----
 .../kafka/common/record/LegacyRecordTest.java   | 129 ++++
 .../common/record/MemoryRecordsBuilderTest.java | 297 ++++++---
 .../kafka/common/record/MemoryRecordsTest.java  | 222 +++++--
 .../apache/kafka/common/record/RecordTest.java  | 129 ----
 .../common/record/SimpleLegacyRecordTest.java   |  87 +++
 .../kafka/common/record/SimpleRecordTest.java   | 144 -----
 .../kafka/common/record/TimestampTypeTest.java  |  42 --
 .../common/requests/RequestResponseTest.java    | 108 +++-
 .../apache/kafka/common/utils/Crc32Test.java    |  61 ++
 .../apache/kafka/common/utils/UtilsTest.java    |  92 +++
 .../runtime/distributed/WorkerGroupMember.java  |   4 +-
 .../apache/kafka/connect/util/ConnectUtils.java |   4 +-
 .../connect/runtime/WorkerSinkTaskTest.java     |   6 +-
 .../main/scala/kafka/admin/AdminClient.scala    |   3 +-
 core/src/main/scala/kafka/api/ApiVersion.scala  |  33 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   3 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |   2 +-
 .../controller/ControllerChannelManager.scala   |   5 +-
 .../kafka/coordinator/GroupCoordinator.scala    |   8 +-
 .../coordinator/GroupMetadataManager.scala      | 264 ++++----
 core/src/main/scala/kafka/log/Log.scala         |  63 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  34 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  35 +-
 .../src/main/scala/kafka/log/LogValidator.scala | 259 ++++----
 core/src/main/scala/kafka/log/TimeIndex.scala   |   6 +-
 .../kafka/message/ByteBufferMessageSet.scala    |  32 +-
 core/src/main/scala/kafka/message/Message.scala |  35 +-
 .../scala/kafka/message/MessageAndOffset.scala  |  14 +-
 .../main/scala/kafka/message/MessageSet.scala   |   2 +-
 .../scala/kafka/network/RequestChannel.scala    |   5 +-
 .../kafka/server/AbstractFetcherThread.scala    |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  47 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  10 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   5 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  10 +-
 .../scala/kafka/server/ReplicaManager.scala     |   3 +-
 .../scala/kafka/tools/DumpLogSegments.scala     | 120 ++--
 .../main/scala/kafka/tools/MirrorMaker.scala    |   4 +-
 .../kafka/tools/ReplicaVerificationTool.scala   |  27 +-
 .../admin/BrokerApiVersionsCommandTest.scala    |   4 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |   6 +-
 .../api/GroupCoordinatorIntegrationTest.scala   |   4 +-
 .../kafka/api/PlaintextConsumerTest.scala       |   6 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |   4 +-
 .../ReplicaFetcherThreadFatalErrorTest.scala    |   3 +-
 .../tools/ReplicaVerificationToolTest.scala     |   6 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |  10 +-
 .../test/scala/other/kafka/StressTestLog.scala  |   4 +-
 .../other/kafka/TestLinearWriteSpeed.scala      |  10 +-
 .../GroupCoordinatorResponseTest.scala          |  23 +-
 .../coordinator/GroupMetadataManagerTest.scala  |  65 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |  10 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |  48 +-
 .../log/LogCleanerLagIntegrationTest.scala      |   6 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |  14 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  61 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  66 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 166 ++---
 .../scala/unit/kafka/log/LogValidatorTest.scala | 510 +++++++++++----
 .../kafka/message/BaseMessageSetTestCases.scala |   3 +-
 .../unit/kafka/network/SocketServerTest.scala   |   8 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |   4 +-
 .../server/AbstractFetcherThreadTest.scala      |  30 +-
 .../unit/kafka/server/EdgeCaseRequestTest.scala |  11 +-
 .../unit/kafka/server/FetchRequestTest.scala    |  45 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |  13 +-
 .../unit/kafka/server/ProduceRequestTest.scala  |  21 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala |  24 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  12 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |  53 +-
 .../unit/kafka/tools/MirrorMakerTest.scala      |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  28 +-
 docs/upgrade.html                               |  30 +-
 .../processor/internals/StreamsKafkaClient.java |   4 +-
 145 files changed, 7555 insertions(+), 3405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 9a4a37f..b775025 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -98,7 +98,7 @@
     <module name="MethodLength"/>
     <module name="ParameterNumber">
       <!-- default is 8 -->
-      <property name="max" value="10"/>
+      <property name="max" value="12"/>
     </module>
     <module name="ClassDataAbstractionCoupling">
       <!-- default is 7 -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 80747e1..3475062 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -95,6 +95,8 @@
       <allow pkg="net.jpountz" />
       <allow pkg="org.apache.kafka.common.record" />
       <allow pkg="org.apache.kafka.common.network" />
+      <allow pkg="org.apache.kafka.common.protocol" />
+      <allow pkg="org.apache.kafka.common.protocol.types" />
       <allow pkg="org.apache.kafka.common.errors" />
     </subpackage>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a39695f..f722aba 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -11,6 +11,8 @@
               files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator).java"/>
     <suppress checks="ClassFanOutComplexity"
               files=".*/protocol/Errors.java"/>
+    <suppress checks="ClassFanOutComplexity"
+              files=".*/common/utils/Utils.java"/>
 
     <suppress checks="MethodLength"
               files="KerberosLogin.java"/>
@@ -25,6 +27,8 @@
               files="Fetcher.java"/>
     <suppress checks="ParameterNumber"
               files="ConfigDef.java"/>
+    <suppress checks="ParameterNumber"
+              files="DefaultRecordBatch.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
               files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse).java"/>
@@ -79,7 +83,7 @@
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse)Test.java"/>
+              files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse)Test.java"/>
 
     <suppress checks="ClassFanOutComplexity"
               files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher)Test.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
new file mode 100644
index 0000000..9c61ff2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.ProduceRequest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Maintains node api versions for access outside of NetworkClient (which is where the information is derived).
+ * The pattern is akin to the use of {@link Metadata} for topic metadata.
+ *
+ * NOTE: This class is intended for INTERNAL usage only within Kafka.
+ */
+public class ApiVersions {
+
+    private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>();
+    private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
+
+    public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions) {
+        this.nodeApiVersions.put(nodeId, nodeApiVersions);
+        this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
+    }
+
+    public synchronized void remove(String nodeId) {
+        this.nodeApiVersions.remove(nodeId);
+        this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
+    }
+
+    public synchronized NodeApiVersions get(String nodeId) {
+        return this.nodeApiVersions.get(nodeId);
+    }
+
+    private byte computeMaxUsableProduceMagic() {
+        // use a magic version which is supported by all brokers to reduce the chance that
+        // we will need to convert the messages when they are ready to be sent.
+        byte maxUsableMagic = RecordBatch.CURRENT_MAGIC_VALUE;
+        for (NodeApiVersions versions : this.nodeApiVersions.values()) {
+            byte nodeMaxUsableMagic = ProduceRequest.requiredMagicForVersion(versions.usableVersion(ApiKeys.PRODUCE));
+            maxUsableMagic = (byte) Math.min(nodeMaxUsableMagic, maxUsableMagic);
+        }
+        return maxUsableMagic;
+    }
+
+    public synchronized byte maxUsableProduceMagic() {
+        return maxUsableProduceMagic;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 83a0009..9d63d43 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.clients;
 
-import java.io.Closeable;
-import java.util.List;
-
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.requests.AbstractRequest;
 
+import java.io.Closeable;
+import java.util.List;
+
 /**
  * The interface for {@link NetworkClient}
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index b6f8b0e..a279fe4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -42,7 +42,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -99,7 +98,7 @@ public class NetworkClient implements KafkaClient {
      */
     private final boolean discoverBrokerVersions;
 
-    private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>();
+    private final ApiVersions apiVersions;
 
     private final Set<String> nodesNeedingApiVersionsFetch = new HashSet<>();
 
@@ -114,9 +113,10 @@ public class NetworkClient implements KafkaClient {
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
                          Time time,
-                         boolean discoverBrokerVersions) {
-        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
-                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions);
+                         boolean discoverBrokerVersions,
+                         ApiVersions apiVersions) {
+        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
+                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions, apiVersions);
     }
 
     public NetworkClient(Selectable selector,
@@ -128,9 +128,10 @@ public class NetworkClient implements KafkaClient {
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
                          Time time,
-                         boolean discoverBrokerVersions) {
+                         boolean discoverBrokerVersions,
+                         ApiVersions apiVersions) {
         this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
-                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions);
+                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, discoverBrokerVersions, apiVersions);
     }
 
     private NetworkClient(MetadataUpdater metadataUpdater,
@@ -143,7 +144,8 @@ public class NetworkClient implements KafkaClient {
                           int socketReceiveBuffer,
                           int requestTimeoutMs,
                           Time time,
-                          boolean discoverBrokerVersions) {
+                          boolean discoverBrokerVersions,
+                          ApiVersions apiVersions) {
         /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
          * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
          * super constructor is invoked.
@@ -167,6 +169,7 @@ public class NetworkClient implements KafkaClient {
         this.reconnectBackoffMs = reconnectBackoffMs;
         this.time = time;
         this.discoverBrokerVersions = discoverBrokerVersions;
+        this.apiVersions = apiVersions;
     }
 
     /**
@@ -285,7 +288,7 @@ public class NetworkClient implements KafkaClient {
         }
         AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
         try {
-            NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
+            NodeApiVersions versionInfo = apiVersions.get(nodeId);
             short version;
             // Note: if versionInfo is null, we have no server version information. This would be
             // the case when sending the initial ApiVersionRequest which fetches the version
@@ -296,7 +299,7 @@ public class NetworkClient implements KafkaClient {
                     log.trace("No version information found when sending message of type {} to node {}. " +
                             "Assuming version {}.", clientRequest.apiKey(), nodeId, version);
             } else {
-                version = versionInfo.usableVersion(clientRequest.apiKey());
+                version = versionInfo.usableVersion(clientRequest.apiKey(), builder.desiredVersion());
             }
             // The call to build may also throw UnsupportedVersionException, if there are essential
             // fields that cannot be represented in the chosen version.
@@ -482,7 +485,7 @@ public class NetworkClient implements KafkaClient {
      */
     private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
         connectionStates.disconnected(nodeId, now);
-        nodeApiVersions.remove(nodeId);
+        apiVersions.remove(nodeId);
         nodesNeedingApiVersionsFetch.remove(nodeId);
         for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
             log.trace("Cancelled request {} due to node {} being disconnected", request.request, nodeId);
@@ -568,7 +571,7 @@ public class NetworkClient implements KafkaClient {
             return;
         }
         NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.apiVersions());
-        nodeApiVersions.put(node, nodeVersionInfo);
+        apiVersions.update(node, nodeVersionInfo);
         this.connectionStates.ready(node);
         if (log.isDebugEnabled()) {
             log.debug("Recorded API versions for node {}: {}", node, nodeVersionInfo);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index f216c60..dc2e6d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -21,27 +21,24 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
 import org.apache.kafka.common.utils.Utils;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.TreeMap;
 
 /**
  * An internal class which represents the API versions supported by a particular node.
  */
 public class NodeApiVersions {
-    private static final short NODE_TOO_OLD = (short) -1;
-    private static final short NODE_TOO_NEW = (short) -2;
-    private final Collection<ApiVersion> nodeApiVersions;
+    // A map of the usable versions of each API, keyed by the ApiKeys instance
+    private final Map<ApiKeys, UsableVersion> usableVersions = new EnumMap<>(ApiKeys.class);
 
-    /**
-     * An array of the usable versions of each API, indexed by the ApiKeys ID.
-     */
-    private final Map<ApiKeys, Short> usableVersions = new EnumMap<>(ApiKeys.class);
+    // List of APIs which the broker supports, but which are unknown to the client
+    private final List<ApiVersion> unknownApis = new ArrayList<>();
 
     /**
      * Create a NodeApiVersions object with the current ApiVersions.
@@ -77,19 +74,13 @@ public class NodeApiVersions {
     }
 
     public NodeApiVersions(Collection<ApiVersion> nodeApiVersions) {
-        this.nodeApiVersions = nodeApiVersions;
         for (ApiVersion nodeApiVersion : nodeApiVersions) {
-            // Newer brokers may support ApiKeys we don't know about, ignore them
             if (ApiKeys.hasId(nodeApiVersion.apiKey)) {
                 ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey);
-                short v = Utils.min(nodeApiKey.latestVersion(), nodeApiVersion.maxVersion);
-                if (v < nodeApiVersion.minVersion) {
-                    usableVersions.put(nodeApiKey, NODE_TOO_NEW);
-                } else if (v < nodeApiKey.oldestVersion()) {
-                    usableVersions.put(nodeApiKey, NODE_TOO_OLD);
-                } else {
-                    usableVersions.put(nodeApiKey, v);
-                }
+                usableVersions.put(nodeApiKey, new UsableVersion(nodeApiKey, nodeApiVersion));
+            } else {
+                // Newer brokers may support ApiKeys we don't know about
+                unknownApis.add(nodeApiVersion);
             }
         }
     }
@@ -98,22 +89,29 @@ public class NodeApiVersions {
      * Return the most recent version supported by both the node and the local software.
      */
     public short usableVersion(ApiKeys apiKey) {
-        Short usableVersion = usableVersions.get(apiKey);
+        return usableVersion(apiKey, null);
+    }
+
+    /**
+     * Return the desired version (if usable) or the latest usable version if the desired version is null.
+     */
+    public short usableVersion(ApiKeys apiKey, Short desiredVersion) {
+        UsableVersion usableVersion = usableVersions.get(apiKey);
         if (usableVersion == null)
             throw new UnsupportedVersionException("The broker does not support " + apiKey);
-        else if (usableVersion == NODE_TOO_OLD)
-            throw new UnsupportedVersionException("The broker is too old to support " + apiKey +
-                " version " + apiKey.oldestVersion());
-        else if (usableVersion == NODE_TOO_NEW)
-            throw new UnsupportedVersionException("The broker is too new to support " + apiKey +
-                " version " + apiKey.latestVersion());
-        else
-            return usableVersion;
+
+        if (desiredVersion == null) {
+            usableVersion.ensureUsable();
+            return usableVersion.value;
+        } else {
+            usableVersion.ensureUsable(desiredVersion);
+            return desiredVersion;
+        }
     }
 
     /**
      * Convert the object to a string with no linebreaks.<p/>
-     *
+     * <p>
      * This toString method is relatively expensive, so avoid calling it unless debug logging is turned on.
      */
     @Override
@@ -131,7 +129,9 @@ public class NodeApiVersions {
         // a TreeMap before printing it out to ensure that we always print in
         // ascending order.
         TreeMap<Short, String> apiKeysText = new TreeMap<>();
-        for (ApiVersion apiVersion : this.nodeApiVersions)
+        for (UsableVersion usableVersion : this.usableVersions.values())
+            apiKeysText.put(usableVersion.apiVersion.apiKey, apiVersionToText(usableVersion.apiVersion));
+        for (ApiVersion apiVersion : unknownApis)
             apiKeysText.put(apiVersion.apiKey, apiVersionToText(apiVersion));
 
         // Also handle the case where some apiKey types are not specified at all in the given ApiVersions,
@@ -173,23 +173,75 @@ public class NodeApiVersions {
         }
 
         if (apiKey != null) {
-            Short usableVersion = usableVersions.get(apiKey);
-            if (usableVersion == NODE_TOO_OLD)
+            UsableVersion usableVersion = usableVersions.get(apiKey);
+            if (usableVersion.isTooOld())
                 bld.append(" [unusable: node too old]");
-            else if (usableVersion == NODE_TOO_NEW)
+            else if (usableVersion.isTooNew())
                 bld.append(" [unusable: node too new]");
             else
-                bld.append(" [usable: ").append(usableVersion).append("]");
+                bld.append(" [usable: ").append(usableVersion.value).append("]");
         }
         return bld.toString();
     }
 
+    /**
+     * Get the version information for a given API.
+     *
+     * @param apiKey The api key to lookup
+     * @return The api version information from the broker or null if it is unsupported
+     */
     public ApiVersion apiVersion(ApiKeys apiKey) {
-        for (ApiVersion nodeApiVersion : nodeApiVersions) {
-            if (nodeApiVersion.apiKey == apiKey.id) {
-                return nodeApiVersion;
+        UsableVersion usableVersion = usableVersions.get(apiKey);
+        if (usableVersion == null)
+            return null;
+        return usableVersion.apiVersion;
+    }
+
+    private static class UsableVersion {
+        private static final short NODE_TOO_OLD = (short) -1;
+        private static final short NODE_TOO_NEW = (short) -2;
+
+        private final ApiKeys apiKey;
+        private final ApiVersion apiVersion;
+        private final Short value;
+
+        private UsableVersion(ApiKeys apiKey, ApiVersion nodeApiVersion) {
+            this.apiKey = apiKey;
+            this.apiVersion = nodeApiVersion;
+            short v = Utils.min(apiKey.latestVersion(), nodeApiVersion.maxVersion);
+            if (v < nodeApiVersion.minVersion) {
+                this.value = NODE_TOO_NEW;
+            } else if (v < apiKey.oldestVersion()) {
+                this.value = NODE_TOO_OLD;
+            } else {
+                this.value = v;
             }
         }
-        throw new NoSuchElementException();
+
+        private boolean isTooOld() {
+            return value == NODE_TOO_OLD;
+        }
+
+        private boolean isTooNew() {
+            return value == NODE_TOO_NEW;
+        }
+
+        private void ensureUsable() {
+            if (value == NODE_TOO_OLD)
+                throw new UnsupportedVersionException("The broker is too old to support " + apiKey +
+                        " version " + apiKey.oldestVersion());
+            else if (value == NODE_TOO_NEW)
+                throw new UnsupportedVersionException("The broker is too new to support " + apiKey +
+                        " version " + apiKey.latestVersion());
+        }
+
+        private void ensureUsable(short desiredVersion) {
+            if (apiVersion.minVersion > desiredVersion || apiVersion.maxVersion < desiredVersion)
+                throw new UnsupportedVersionException("The broker does not support the requested version " + desiredVersion +
+                        " for api " + apiKey + ". Supported versions are " + apiVersion.minVersion +
+                        " to " + apiVersion.maxVersion + ".");
+        }
+
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 26c3768..fe9ede8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 
 /**
@@ -25,7 +25,7 @@ import org.apache.kafka.common.record.TimestampType;
  * to the record in a Kafka partition, and a timestamp as marked by the corresponding ProducerRecord.
  */
 public class ConsumerRecord<K, V> {
-    public static final long NO_TIMESTAMP = Record.NO_TIMESTAMP;
+    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
     public static final int NULL_SIZE = -1;
     public static final int NULL_CHECKSUM = -1;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 612f446..3894b5f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
@@ -663,13 +664,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
                     this.metadata,
                     clientId,
-                    100, // a fixed large enough value will suffice
+                    100, // a fixed large enough value will suffice for max in-flight requests
                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                     time,
-                    true);
+                    true,
+                    new ApiVersions());
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 441206a..7236653 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -45,7 +45,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.InvalidRecordException;
-import org.apache.kafka.common.record.LogEntry;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
@@ -769,21 +769,34 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
 
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
                 boolean skippedRecords = false;
-                for (LogEntry logEntry : partition.records.deepEntries()) {
-                    // Skip the messages earlier than current position.
-                    if (logEntry.offset() >= position) {
-                        parsed.add(parseRecord(tp, logEntry));
-                        bytes += logEntry.sizeInBytes();
-                    } else
-                        skippedRecords = true;
+                for (RecordBatch batch : partition.records.batches()) {
+                    if (this.checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
+                        try {
+                            batch.ensureValid();
+                        } catch (InvalidRecordException e) {
+                            throw new KafkaException("Record batch for partition " + partition + " at offset " +
+                                    batch.baseOffset() + " is invalid, cause: " + e.getMessage());
+                        }
+                    }
+
+                    for (Record record : batch) {
+                        // control records should not be returned to the user. also skip anything out of range
+                        if (record.isControlRecord() || record.offset() < position) {
+                            skippedRecords = true;
+                        } else {
+                            parsed.add(parseRecord(tp, batch, record));
+                            bytes += record.sizeInBytes();
+                        }
+                    }
                 }
 
                 recordsCount = parsed.size();
 
-                log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
+                log.trace("Adding {} fetched record(s) for partition {} with offset {} to buffered record list",
+                        parsed.size(), tp, position);
                 parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
 
-                if (parsed.isEmpty() && !skippedRecords && (partition.records.sizeInBytes() > 0)) {
+                if (parsed.isEmpty() && !skippedRecords && partition.records.sizeInBytes() > 0) {
                     if (completedFetch.responseVersion < 3) {
                         // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
                         Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
@@ -842,25 +855,22 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
         return parsedRecords;
     }
 
-    /**
-     * Parse the record entry, deserializing the key / value fields if necessary
-     */
-    private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
-        Record record = logEntry.record();
-
+    private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
+                                             RecordBatch batch,
+                                             Record record) {
         if (this.checkCrcs) {
             try {
                 record.ensureValid();
             } catch (InvalidRecordException e) {
-                throw new KafkaException("Record for partition " + partition + " at offset " + logEntry.offset()
+                throw new KafkaException("Record for partition " + partition + " at offset " + record.offset()
                         + " is invalid, cause: " + e.getMessage());
             }
         }
 
         try {
-            long offset = logEntry.offset();
+            long offset = record.offset();
             long timestamp = record.timestamp();
-            TimestampType timestampType = record.timestampType();
+            TimestampType timestampType = batch.timestampType();
             ByteBuffer keyBytes = record.key();
             byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
             K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
@@ -875,7 +885,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                                         key, value);
         } catch (RuntimeException e) {
             throw new SerializationException("Error deserializing key/value for partition " + partition +
-                    " at offset " + logEntry.offset(), e);
+                    " at offset " + record.offset(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 33da0c4..78dd668 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.producer;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
@@ -43,9 +44,9 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
@@ -157,6 +158,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final long maxBlockTimeMs;
     private final int requestTimeoutMs;
     private final ProducerInterceptors<K, V> interceptors;
+    private final ApiVersions apiVersions;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -296,19 +298,22 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             }
 
+            this.apiVersions = new ApiVersions();
             this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                     this.totalMemorySize,
                     this.compressionType,
                     config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                     retryBackoffMs,
                     metrics,
-                    time);
+                    time,
+                    apiVersions);
 
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
             NetworkClient client = new NetworkClient(
-                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
+                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+                            this.metrics, time, "producer", channelBuilder),
                     this.metadata,
                     clientId,
                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
@@ -317,7 +322,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                     this.requestTimeoutMs,
                     time,
-                    true);
+                    true,
+                    apiVersions);
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,
@@ -327,7 +333,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     config.getInt(ProducerConfig.RETRIES_CONFIG),
                     this.metrics,
                     Time.SYSTEM,
-                    this.requestTimeoutMs);
+                    this.requestTimeoutMs,
+                    apiVersions);
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
@@ -472,14 +479,16 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
 
             int partition = partition(record, serializedKey, serializedValue, cluster);
-            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
+            int serializedSize = AbstractRecords.sizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
+                    serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);
             tp = new TopicPartition(record.topic(), partition);
             long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
             // producer callback will make sure to call both 'callback' and interceptor callback
             Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
+                    serializedValue, interceptCallback, remainingWaitMs);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
@@ -824,7 +833,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         public void onCompletion(RecordMetadata metadata, Exception exception) {
             if (this.interceptors != null) {
                 if (metadata == null) {
-                    this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, Record.NO_TIMESTAMP, -1, -1, -1),
+                    this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1, -1),
                                                         exception);
                 } else {
                     this.interceptors.onAcknowledgement(metadata, exception);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 35f5d94..22797b6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -16,6 +16,17 @@
  */
 package org.apache.kafka.clients.producer;
 
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
+import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.serialization.Serializer;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -26,17 +37,6 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.serialization.Serializer;
-
 
 /**
  * A mock of the producer interface you can use for testing code that uses Kafka.
@@ -118,10 +118,10 @@ public class MockProducer<K, V> implements Producer<K, V> {
             partition = partition(record, this.cluster);
         TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
         ProduceRequestResult result = new ProduceRequestResult(topicPartition);
-        FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP, 0, 0, 0);
+        FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0, 0, 0);
         long offset = nextOffset(topicPartition);
         Completion completion = new Completion(offset,
-                                               new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP, 0, 0, 0),
+                                               new RecordMetadata(topicPartition, 0, offset, RecordBatch.NO_TIMESTAMP, 0, 0, 0),
                                                result, callback);
         this.sent.add(record);
         if (autoComplete)
@@ -245,7 +245,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
         }
 
         public void complete(RuntimeException e) {
-            result.set(e == null ? offset : -1L, Record.NO_TIMESTAMP, e);
+            result.set(e == null ? offset : -1L, RecordBatch.NO_TIMESTAMP, e);
             if (callback != null) {
                 if (e == null)
                     callback.onCompletion(metadata, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index db34487..2d06ea8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 
 /**
  * The metadata for a record that has been acknowledged by the server
@@ -54,7 +54,7 @@ public final class RecordMetadata {
 
     @Deprecated
     public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) {
-        this(topicPartition, baseOffset, relativeOffset, Record.NO_TIMESTAMP, -1, -1, -1);
+        this(topicPartition, baseOffset, relativeOffset, RecordBatch.NO_TIMESTAMP, -1, -1, -1);
     }
 
     public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
index 0a73c41..fbfef61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -35,7 +35,7 @@ public final class ProduceRequestResult {
     private final TopicPartition topicPartition;
 
     private volatile Long baseOffset = null;
-    private volatile long logAppendTime = Record.NO_TIMESTAMP;
+    private volatile long logAppendTime = RecordBatch.NO_TIMESTAMP;
     private volatile RuntimeException error;
 
     /**
@@ -97,7 +97,7 @@ public final class ProduceRequestResult {
      * Return true if log append time is being used for this topic
      */
     public boolean hasLogAppendTime() {
-        return logAppendTime != Record.NO_TIMESTAMP;
+        return logAppendTime != RecordBatch.NO_TIMESTAMP;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 46273a1..2b90f81 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -20,9 +20,10 @@ import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Record;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,11 +74,11 @@ public final class ProducerBatch {
      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
     public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
-        if (!recordsBuilder.hasRoomFor(key, value)) {
+        if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
             return null;
         } else {
             long checksum = this.recordsBuilder.append(timestamp, key, value);
-            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
+            this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value));
             this.lastAppendTime = now;
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                    timestamp, checksum,
@@ -173,7 +174,7 @@ public final class ProducerBatch {
     void expirationDone() {
         if (expiryErrorMessage == null)
             throw new IllegalStateException("Batch has not expired");
-        this.done(-1L, Record.NO_TIMESTAMP,
+        this.done(-1L, RecordBatch.NO_TIMESTAMP,
                   new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
     }
 
@@ -243,4 +244,7 @@ public final class ProducerBatch {
         return !recordsBuilder.isClosed();
     }
 
+    public byte magic() {
+        return recordsBuilder.magic();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index da3be01..e4ab4c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.ProducerInterceptor;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,7 +113,7 @@ public class ProducerInterceptors<K, V> implements Closeable {
                         interceptTopicPartition = new TopicPartition(record.topic(),
                                                                      record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
                     }
-                    interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, Record.NO_TIMESTAMP, -1, -1, -1),
+                    interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1, -1),
                                                   exception);
                 }
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 5d95f53..1e495d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
@@ -27,11 +28,11 @@ import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.Time;
@@ -73,6 +74,7 @@ public final class RecordAccumulator {
     private final long retryBackoffMs;
     private final BufferPool free;
     private final Time time;
+    private final ApiVersions apiVersions;
     private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
     private final IncompleteBatches incomplete;
     // The following variables are only accessed by the sender thread, so we don't need to protect them.
@@ -92,6 +94,7 @@ public final class RecordAccumulator {
      *        exhausting all retries in a short period of time.
      * @param metrics The metrics
      * @param time The time instance to use
+     * @param apiVersions Request API versions for current connected brokers
      */
     public RecordAccumulator(int batchSize,
                              long totalSize,
@@ -99,7 +102,8 @@ public final class RecordAccumulator {
                              long lingerMs,
                              long retryBackoffMs,
                              Metrics metrics,
-                             Time time) {
+                             Time time,
+                             ApiVersions apiVersions) {
         this.drainIndex = 0;
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);
@@ -114,6 +118,7 @@ public final class RecordAccumulator {
         this.incomplete = new IncompleteBatches();
         this.muted = new HashSet<>();
         this.time = time;
+        this.apiVersions = apiVersions;
         registerMetrics(metrics, metricGrpName);
     }
 
@@ -182,7 +187,8 @@ public final class RecordAccumulator {
             }
 
             // we don't have an in-progress record batch try to allocate a new batch
-            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
+            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
+            int size = Math.max(this.batchSize, AbstractRecords.sizeInBytesUpperBound(maxUsableMagic, key, value));
             log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
             buffer = free.allocate(size, maxTimeToBlock);
             synchronized (dq) {
@@ -196,7 +202,8 @@ public final class RecordAccumulator {
                     return appendResult;
                 }
 
-                MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize);
+                MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, maxUsableMagic, compression,
+                        TimestampType.CREATE_TIME, this.batchSize);
                 ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
 
@@ -526,7 +533,7 @@ public final class RecordAccumulator {
                 batch.close();
                 dq.remove(batch);
             }
-            batch.done(-1L, Record.NO_TIMESTAMP, new IllegalStateException("Producer is closed forcefully."));
+            batch.done(-1L, RecordBatch.NO_TIMESTAMP, new IllegalStateException("Producer is closed forcefully."));
             deallocate(batch);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 3604f68..a7394e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
@@ -95,6 +96,9 @@ public class Sender implements Runnable {
     /* the max time to wait for the server to respond to the request*/
     private final int requestTimeout;
 
+    /* current request API versions supported by the known brokers */
+    private final ApiVersions apiVersions;
+
     public Sender(KafkaClient client,
                   Metadata metadata,
                   RecordAccumulator accumulator,
@@ -104,7 +108,8 @@ public class Sender implements Runnable {
                   int retries,
                   Metrics metrics,
                   Time time,
-                  int requestTimeout) {
+                  int requestTimeout,
+                  ApiVersions apiVersions) {
         this.client = client;
         this.accumulator = accumulator;
         this.metadata = metadata;
@@ -116,6 +121,7 @@ public class Sender implements Runnable {
         this.time = time;
         this.sensors = new SenderMetrics(metrics);
         this.requestTimeout = requestTimeout;
+        this.apiVersions = apiVersions;
     }
 
     /**
@@ -162,8 +168,7 @@ public class Sender implements Runnable {
     /**
      * Run a single iteration of sending
      * 
-     * @param now
-     *            The current POSIX time in milliseconds
+     * @param now The current POSIX time in milliseconds
      */
     void run(long now) {
         Cluster cluster = metadata.fetch();
@@ -347,16 +352,38 @@ public class Sender implements Runnable {
      * Create a produce request from the given record batches
      */
     private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
+        if (batches.isEmpty())
+            return;
+
         Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
         final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
+
+        // find the minimum magic version used when creating the record sets
+        byte minUsedMagic = apiVersions.maxUsableProduceMagic();
+        for (ProducerBatch batch : batches) {
+            if (batch.magic() < minUsedMagic)
+                minUsedMagic = batch.magic();
+        }
+
         for (ProducerBatch batch : batches) {
             TopicPartition tp = batch.topicPartition;
-            produceRecordsByPartition.put(tp, batch.records());
+            MemoryRecords records = batch.records();
+
+            // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
+            // that the producer starts building the batch and the time that we send the request, and we may have
+            // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
+            // the new message format, but found that the broker didn't support it, so we need to down-convert on the
+            // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
+            // not all support the same message format version. For example, if a partition migrates from a broker
+            // which is supporting the new magic version to one which doesn't, then we will need to convert.
+            if (!records.hasMatchingMagic(minUsedMagic))
+                records = batch.records().downConvert(minUsedMagic);
+            produceRecordsByPartition.put(tp, records);
             recordsByPartition.put(tp, batch);
         }
 
-        ProduceRequest.Builder requestBuilder =
-                new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
+        ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(minUsedMagic, acks, timeout,
+                produceRecordsByPartition);
         RequestCompletionHandler callback = new RequestCompletionHandler() {
             public void onComplete(ClientResponse response) {
                 handleProduceResponse(response, recordsByPartition, time.milliseconds());

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 3343133..8c3e08c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -31,6 +31,7 @@ import static org.apache.kafka.common.protocol.types.Type.BYTES;
 import static org.apache.kafka.common.protocol.types.Type.INT16;
 import static org.apache.kafka.common.protocol.types.Type.INT32;
 import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
 import static org.apache.kafka.common.protocol.types.Type.RECORDS;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
 import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
@@ -165,6 +166,21 @@ public class Protocol {
      */
     public static final Schema PRODUCE_REQUEST_V2 = PRODUCE_REQUEST_V1;
 
+    // Produce request V3 adds the transactional id which is used for authorization when attempting to write
+    // transactional data. This version also adds support for message format V2.
+    public static final Schema PRODUCE_REQUEST_V3 = new Schema(
+            new Field("transactional_id",
+                    NULLABLE_STRING,
+                    "The transactional ID of the producer. This is used to authorize transaction produce requests. " +
+                    "This can be null for non-transactional producers."),
+            new Field("acks",
+                    INT16,
+                    "The number of acknowledgments the producer requires the leader to have received before " +
+                    "considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader " +
+                    "and -1 for the full ISR."),
+            new Field("timeout", INT32, "The time to await a response in ms."),
+            new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
     public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
                                                                           new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                                  new Field("partition_responses",
@@ -204,8 +220,10 @@ public class Protocol {
                                                                           "Duration in milliseconds for which the request was throttled" +
                                                                               " due to quota violation. (Zero if the request did not violate any quota.)",
                                                                           0));
-    public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2};
-    public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2};
+    public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
+
+    public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};
+    public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3};
 
     /* Offset commit api */
     public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -499,7 +517,7 @@ public class Protocol {
     // Only the version number is incremented to indicate the client support message format V1 which uses
     // relative offset and has timestamp.
     public static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1;
-    // FETCH_REQUEST_V3 added top level max_bytes field - the total size of partition data to accumulate in response.
+    // Fetch Request V3 added top level max_bytes field - the total size of partition data to accumulate in response.
     // The partition ordering is now relevant - partitions will be processed in order they appear in request.
     public static final Schema FETCH_REQUEST_V3 = new Schema(new Field("replica_id",
                                                                        INT32,
@@ -519,6 +537,34 @@ public class Protocol {
                                                                        new ArrayOf(FETCH_REQUEST_TOPIC_V0),
                                                                        "Topics to fetch in the order provided."));
 
+    // The V4 Fetch Request adds the fetch isolation level and exposes magic v2 (via the response).
+    public static final Schema FETCH_REQUEST_V4 = new Schema(
+            new Field("replica_id",
+                    INT32,
+                    "Broker id of the follower. For normal consumers, use -1."),
+            new Field("max_wait_time",
+                    INT32,
+                    "Maximum time in ms to wait for the response."),
+            new Field("min_bytes",
+                    INT32,
+                    "Minimum bytes to accumulate in the response."),
+            new Field("max_bytes",
+                    INT32,
+                    "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
+                    "if the first message in the first non-empty partition of the fetch is larger than this " +
+                    "value, the message will still be returned to ensure that progress can be made."),
+            new Field("isolation_level",
+                    INT8,
+                    "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
+                    "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
+                     "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
+                     "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
+                     "and enables the inclusion of the list of aborted transactions in the result, which allows " +
+                     "consumers to discard ABORTED transactional records"),
+            new Field("topics",
+                    new ArrayOf(FETCH_REQUEST_TOPIC_V0),
+                    "Topics to fetch in the order provided."));
+
     public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
                                                                                          INT32,
                                                                                          "Topic partition id."),
@@ -549,8 +595,46 @@ public class Protocol {
     public static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
     public static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
 
-    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3};
-    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3};
+
+    // The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
+    // last stable offset). It also exposes messages with magic v2 (along with older formats).
+    private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
+            new Field("pid", INT64, "The producer ID (PID) associated with the aborted transactions"),
+            new Field("first_offset", INT64, "The first offset in the aborted transaction"));
+
+    public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema(
+            new Field("partition",
+                    INT32,
+                    "Topic partition id."),
+            new Field("error_code", INT16),
+            new Field("high_watermark",
+                    INT64,
+                    "Last committed offset."),
+            new Field("last_stable_offset",
+                    INT64,
+                    "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
+                    "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
+            new Field("aborted_transactions",
+                    ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
+
+    public static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
+            new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V4),
+            new Field("record_set", RECORDS));
+
+    public static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
+            new Field("topic", STRING),
+            new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
+
+    public static final Schema FETCH_RESPONSE_V4 = new Schema(
+            new Field("throttle_time_ms",
+                    INT32,
+                    "Duration in milliseconds for which the request was throttled " +
+                    "due to quota violation (zero if the request did not violate any quota).",
+                    0),
+            new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
+
+    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4};
+    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4};
 
     /* List groups api */
     public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 39e46fd..ffca09c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -16,14 +16,14 @@
  */
 package org.apache.kafka.common.protocol.types;
 
-import java.nio.ByteBuffer;
-
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.Utils;
 
+import java.nio.ByteBuffer;
+
 /**
  * A serializable type
  */
@@ -235,16 +235,15 @@ public abstract class Type {
         }
 
         @Override
-        public Object read(ByteBuffer buffer) {
+        public String read(ByteBuffer buffer) {
             short length = buffer.getShort();
             if (length < 0)
                 throw new SchemaException("String length " + length + " cannot be negative");
             if (length > buffer.remaining())
                 throw new SchemaException("Error reading string of length " + length + ", only " + buffer.remaining() + " bytes available");
-
-            byte[] bytes = new byte[length];
-            buffer.get(bytes);
-            return Utils.utf8(bytes);
+            String result = Utils.utf8(buffer, length);
+            buffer.position(buffer.position() + length);
+            return result;
         }
 
         @Override
@@ -287,16 +286,15 @@ public abstract class Type {
         }
 
         @Override
-        public Object read(ByteBuffer buffer) {
+        public String read(ByteBuffer buffer) {
             short length = buffer.getShort();
             if (length < 0)
                 return null;
             if (length > buffer.remaining())
                 throw new SchemaException("Error reading string of length " + length + ", only " + buffer.remaining() + " bytes available");
-
-            byte[] bytes = new byte[length];
-            buffer.get(bytes);
-            return Utils.utf8(bytes);
+            String result = Utils.utf8(buffer, length);
+            buffer.position(buffer.position() + length);
+            return result;
         }
 
         @Override
@@ -443,7 +441,7 @@ public abstract class Type {
         }
 
         @Override
-        public Object read(ByteBuffer buffer) {
+        public Records read(ByteBuffer buffer) {
             ByteBuffer recordsBuffer = (ByteBuffer) NULLABLE_BYTES.read(buffer);
             return MemoryRecords.readableRecords(recordsBuffer);
         }


Mime
View raw message