kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-2071: Replace Producer Request/Response with their org.apache.kafka.common.requests equivalents
Date Thu, 21 Jan 2016 00:28:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 959cf09e8 -> 79cda0472


KAFKA-2071: Replace Producer Request/Response with their org.apache.kafka.common.requests equivalents

This PR replaces all occurrences of kafka.api.ProducerRequest/ProducerResponse by their common equivalents.

Author: David Jacot <david.jacot@gmail.com>

Reviewers: Grant Henke <granthenke@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #110 from dajac/KAFKA-2071


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79cda047
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79cda047
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79cda047

Branch: refs/heads/trunk
Commit: 79cda0472b8e3af915c45daca9cce82d7c964182
Parents: 959cf09
Author: David Jacot <david.jacot@gmail.com>
Authored: Wed Jan 20 16:27:34 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Jan 20 16:27:34 2016 -0800

----------------------------------------------------------------------
 .../kafka/common/requests/ProduceRequest.java   |  8 +-
 .../kafka/common/requests/ProduceResponse.java  |  2 +-
 .../kafka/common/requests/RequestSend.java      |  2 +-
 .../common/requests/RequestResponseTest.java    |  2 +-
 .../coordinator/GroupMetadataManager.scala      | 53 +++++++------
 .../scala/kafka/network/RequestChannel.scala    | 19 ++---
 .../kafka/server/DelayedOperationKey.scala      |  3 +
 .../scala/kafka/server/DelayedProduce.scala     | 30 +++----
 .../src/main/scala/kafka/server/KafkaApis.scala | 83 +++++++++++---------
 .../scala/kafka/server/ReplicaManager.scala     | 61 +++++++-------
 .../GroupCoordinatorResponseTest.scala          | 20 ++---
 .../unit/kafka/network/SocketServerTest.scala   | 74 ++++++++++-------
 .../unit/kafka/server/ReplicaManagerTest.scala  | 21 +++--
 13 files changed, 208 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 0581f84..a915247 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -103,7 +103,9 @@ public class ProduceRequest extends AbstractRequest {
 
         switch (versionId) {
             case 0:
-                return new ProduceResponse(responseMap, 0);
+                return new ProduceResponse(responseMap);
+            case 1:
+                return new ProduceResponse(responseMap, ProduceResponse.DEFAULT_THROTTLE_TIME);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
@@ -122,6 +124,10 @@ public class ProduceRequest extends AbstractRequest {
         return partitionRecords;
     }
 
+    public void clearPartitionRecords() {
+        partitionRecords.clear();
+    }
+
     public static ProduceRequest parse(ByteBuffer buffer, int versionId) {
         return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index fc41307..c213332 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -43,7 +43,7 @@ public class ProduceResponse extends AbstractRequestResponse {
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     public static final long INVALID_OFFSET = -1L;
-    private static final int DEFAULT_THROTTLE_TIME = 0;
+    public static final int DEFAULT_THROTTLE_TIME = 0;
 
     /**
      * Possible error code:

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
index 3fec60b..02cac80 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
@@ -31,7 +31,7 @@ public class RequestSend extends NetworkSend {
         this.body = body;
     }
 
-    private static ByteBuffer serialize(RequestHeader header, Struct body) {
+    public static ByteBuffer serialize(RequestHeader header, Struct body) {
         ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
         header.writeTo(buffer);
         body.writeTo(buffer);

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 69431a5..789cca7 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -81,7 +81,7 @@ public class RequestResponseTest {
                 createOffsetFetchRequest().getErrorResponse(0, new UnknownServerException()),
                 createOffsetFetchResponse(),
                 createProduceRequest(),
-                createProduceRequest().getErrorResponse(0, new UnknownServerException()),
+                createProduceRequest().getErrorResponse(1, new UnknownServerException()),
                 createProduceResponse(),
                 createStopReplicaRequest(),
                 createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()),

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index beb5a6f..48818c3 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -20,12 +20,14 @@ package kafka.coordinator
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.utils.CoreUtils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.{ArrayOf, Struct, Schema, Field}
 import org.apache.kafka.common.protocol.types.Type.STRING
 import org.apache.kafka.common.protocol.types.Type.INT32
 import org.apache.kafka.common.protocol.types.Type.INT64
 import org.apache.kafka.common.protocol.types.Type.BYTES
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 
 import kafka.utils._
@@ -35,7 +37,6 @@ import kafka.log.FileMessageSet
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import kafka.tools.MessageFormatter
-import kafka.api.ProducerResponseStatus
 import kafka.server.ReplicaManager
 
 import scala.collection._
@@ -47,8 +48,8 @@ import java.util.concurrent.TimeUnit
 import com.yammer.metrics.core.Gauge
 
 
-case class DelayedStore(messageSet: Map[TopicAndPartition, MessageSet],
-                        callback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
+case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
+                        callback: Map[TopicPartition, PartitionResponse] => Unit)
 
 class GroupMetadataManager(val brokerId: Int,
                            val config: OffsetConfig,
@@ -171,7 +172,7 @@ class GroupMetadataManager(val brokerId: Int,
       bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment)
     )
 
-    val groupMetadataPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))
+    val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))
 
     val groupMetadataMessageSet = Map(groupMetadataPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
@@ -179,7 +180,7 @@ class GroupMetadataManager(val brokerId: Int,
     val generationId = group.generationId
 
     // set the callback function to insert the created group into cache after log append completed
-    def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
+    def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
       // the append response should only contain the topics partition
       if (responseStatus.size != 1 || ! responseStatus.contains(groupMetadataPartition))
         throw new IllegalStateException("Append status %s should only have one partition %s"
@@ -190,30 +191,30 @@ class GroupMetadataManager(val brokerId: Int,
       val status = responseStatus(groupMetadataPartition)
 
       var responseCode = Errors.NONE.code
-      if (status.error != Errors.NONE.code) {
+      if (status.errorCode != Errors.NONE.code) {
         debug("Metadata from group %s with generation %d failed when appending to log due to %s"
-          .format(group.groupId, generationId, Errors.forCode(status.error).exceptionName))
+          .format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName))
 
         // transform the log append error code to the corresponding the commit status error code
-        responseCode = if (status.error == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) {
+        responseCode = if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) {
           Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
-        } else if (status.error == Errors.NOT_LEADER_FOR_PARTITION.code) {
+        } else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) {
           Errors.NOT_COORDINATOR_FOR_GROUP.code
-        } else if (status.error == Errors.REQUEST_TIMED_OUT.code) {
+        } else if (status.errorCode == Errors.REQUEST_TIMED_OUT.code) {
           Errors.REBALANCE_IN_PROGRESS.code
-        } else if (status.error == Errors.MESSAGE_TOO_LARGE.code
-          || status.error == Errors.RECORD_LIST_TOO_LARGE.code
-          || status.error == Errors.INVALID_FETCH_SIZE.code) {
+        } else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
+          || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
+          || status.errorCode == Errors.INVALID_FETCH_SIZE.code) {
 
           error("Appending metadata message for group %s generation %d failed due to %s, returning UNKNOWN error code to the client"
-            .format(group.groupId, generationId, Errors.forCode(status.error).exceptionName))
+            .format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName))
 
           Errors.UNKNOWN.code
         } else {
           error("Appending metadata message for group %s generation %d failed due to unexpected error: %s"
-            .format(group.groupId, generationId, status.error))
+            .format(group.groupId, generationId, status.errorCode))
 
-          status.error
+          status.errorCode
         }
       }
 
@@ -254,13 +255,13 @@ class GroupMetadataManager(val brokerId: Int,
       )
     }.toSeq
 
-    val offsetTopicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId))
+    val offsetTopicPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId))
 
     val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
 
     // set the callback function to insert offsets into cache after log append completed
-    def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
+    def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
       // the append response should only contain the topics partition
       if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
         throw new IllegalStateException("Append status %s should only have one partition %s"
@@ -271,26 +272,26 @@ class GroupMetadataManager(val brokerId: Int,
       val status = responseStatus(offsetTopicPartition)
 
       val responseCode =
-        if (status.error == Errors.NONE.code) {
+        if (status.errorCode == Errors.NONE.code) {
           filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
             putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
           }
           Errors.NONE.code
         } else {
           debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
-            .format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.error).exceptionName))
+            .format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName))
 
           // transform the log append error code to the corresponding the commit status error code
-          if (status.error == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+          if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
             Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
-          else if (status.error == Errors.NOT_LEADER_FOR_PARTITION.code)
+          else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code)
             Errors.NOT_COORDINATOR_FOR_GROUP.code
-          else if (status.error == Errors.MESSAGE_TOO_LARGE.code
-            || status.error == Errors.RECORD_LIST_TOO_LARGE.code
-            || status.error == Errors.INVALID_FETCH_SIZE.code)
+          else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
+            || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
+            || status.errorCode == Errors.INVALID_FETCH_SIZE.code)
             Errors.INVALID_COMMIT_OFFSET_SIZE.code
           else
-            status.error
+            status.errorCode
         }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 998f51a..f0d599d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -19,18 +19,17 @@ package kafka.network
 
 import java.net.InetAddress
 import java.nio.ByteBuffer
-import java.security.Principal
+import java.util.HashMap
 import java.util.concurrent._
 
 import com.yammer.metrics.core.Gauge
 import kafka.api._
-import kafka.common.TopicAndPartition
-import kafka.message.ByteBufferMessageSet
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{Logging, SystemTime}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader}
+import org.apache.kafka.common.requests.{RequestSend, ProduceRequest, AbstractRequest, RequestHeader}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.log4j.Logger
 
@@ -39,12 +38,9 @@ object RequestChannel extends Logging {
   val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost()), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)
 
   def getShutdownReceive() = {
-    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
-    val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
-    byteBuffer.putShort(ApiKeys.PRODUCE.id)
-    emptyProducerRequest.writeTo(byteBuffer)
-    byteBuffer.rewind()
-    byteBuffer
+    val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, "", 0)
+    val emptyProduceRequest = new ProduceRequest(0, 0, new HashMap[TopicPartition, ByteBuffer]())
+    RequestSend.serialize(emptyRequestHeader, emptyProduceRequest.toStruct)
   }
 
   case class Session(principal: KafkaPrincipal, clientAddress: InetAddress)
@@ -66,8 +62,7 @@ object RequestChannel extends Logging {
     // request types should only use the client-side versions which are parsed with
     // o.a.k.common.requests.AbstractRequest.getRequest()
     private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
-      Map(ApiKeys.PRODUCE.id -> ProducerRequest.readFrom,
-        ApiKeys.FETCH.id -> FetchRequest.readFrom,
+      Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
         ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
         ApiKeys.UPDATE_METADATA_KEY.id -> UpdateMetadataRequest.readFrom,
         ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom,

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/core/src/main/scala/kafka/server/DelayedOperationKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index f005019..072a658 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
 
 /**
  * Keys used for delayed operation metrics recording
@@ -33,6 +34,8 @@ object DelayedOperationKey {
 /* used by delayed-produce and delayed-fetch operations */
 case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey {
 
+  def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition)
+
   def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
 
   override def keyLabel = "%s-%d".format(topic, partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index c228807..be1be4f 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -21,26 +21,26 @@ package kafka.server
 import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Meter
-import kafka.api.ProducerResponseStatus
-import kafka.common.TopicAndPartition
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Pool
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 
 import scala.collection._
 
-case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) {
+case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
   @volatile var acksPending = false
 
   override def toString = "[acksPending: %b, error: %d, startOffset: %d, requiredOffset: %d]"
-    .format(acksPending, responseStatus.error, responseStatus.offset, requiredOffset)
+    .format(acksPending, responseStatus.errorCode, responseStatus.baseOffset, requiredOffset)
 }
 
 /**
  * The produce metadata maintained by the delayed produce operation
  */
 case class ProduceMetadata(produceRequiredAcks: Short,
-                           produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) {
+                           produceStatus: Map[TopicPartition, ProducePartitionStatus]) {
 
   override def toString = "[requiredAcks: %d, partitionStatus: %s]"
     .format(produceRequiredAcks, produceStatus)
@@ -53,20 +53,20 @@ case class ProduceMetadata(produceRequiredAcks: Short,
 class DelayedProduce(delayMs: Long,
                      produceMetadata: ProduceMetadata,
                      replicaManager: ReplicaManager,
-                     responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
+                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit)
   extends DelayedOperation(delayMs) {
 
   // first update the acks pending variable according to the error code
-  produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
-    if (status.responseStatus.error == Errors.NONE.code) {
+  produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
+    if (status.responseStatus.errorCode == Errors.NONE.code) {
       // Timeout error state will be cleared when required acks are received
       status.acksPending = true
-      status.responseStatus.error = Errors.REQUEST_TIMED_OUT.code
+      status.responseStatus.errorCode = Errors.REQUEST_TIMED_OUT.code
     } else {
       status.acksPending = false
     }
 
-    trace("Initial partition status for %s is %s".format(topicAndPartition, status))
+    trace("Initial partition status for %s is %s".format(topicPartition, status))
   }
 
   /**
@@ -97,11 +97,11 @@ class DelayedProduce(delayMs: Long,
         if (errorCode != Errors.NONE.code) {
           // Case B.1
           status.acksPending = false
-          status.responseStatus.error = errorCode
+          status.responseStatus.errorCode = errorCode
         } else if (hasEnough) {
           // Case B.2
           status.acksPending = false
-          status.responseStatus.error = Errors.NONE.code
+          status.responseStatus.errorCode = Errors.NONE.code
         }
       }
     }
@@ -134,14 +134,14 @@ object DelayedProduceMetrics extends KafkaMetricsGroup {
 
   private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
 
-  private val partitionExpirationMeterFactory = (key: TopicAndPartition) =>
+  private val partitionExpirationMeterFactory = (key: TopicPartition) =>
     newMeter("ExpiresPerSec",
              "requests",
              TimeUnit.SECONDS,
              tags = Map("topic" -> key.topic, "partition" -> key.partition.toString))
-  private val partitionExpirationMeters = new Pool[TopicAndPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory))
+  private val partitionExpirationMeters = new Pool[TopicPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory))
 
-  def recordExpiration(partition: TopicAndPartition) {
+  def recordExpiration(partition: TopicPartition) {
     aggregateExpirationMeter.mark()
     partitionExpirationMeters.getAndMaybePut(partition).mark()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f7d6be9..e48df90 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,7 @@ import kafka.common._
 import kafka.controller.KafkaController
 import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
 import kafka.log._
-import kafka.message.MessageSet
+import kafka.message.{ByteBufferMessageSet, MessageSet}
 import kafka.network._
 import kafka.network.RequestChannel.{Session, Response}
 import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
@@ -35,11 +35,12 @@ import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException,
 ClusterAuthorizationException}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ProtoUtils, ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
 DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
 LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
-StopReplicaRequest, StopReplicaResponse}
+StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse}
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Node}
 
@@ -109,7 +110,6 @@ class KafkaApis(val requestChannel: RequestChannel,
 
           error("Error when handling request %s".format(request.body), e)
         }
-
     } finally
       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
@@ -315,44 +315,43 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle a produce request
    */
   def handleProducerRequest(request: RequestChannel.Request) {
-    val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
-    val numBytesAppended = produceRequest.sizeInBytes
+    val produceRequest = request.body.asInstanceOf[ProduceRequest]
+    val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
 
-    val (authorizedRequestInfo, unauthorizedRequestInfo) =  produceRequest.data.partition  {
-      case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic))
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.partitionRecords.asScala.partition {
+      case (topicPartition, _) => authorize(request.session, Write, new Resource(Topic, topicPartition.topic))
     }
 
     // the callback for sending a produce response
-    def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
+    def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
 
-      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1))
+      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1))
 
       var errorInResponse = false
 
-      mergedResponseStatus.foreach { case (topicAndPartition, status) =>
-        if (status.error != Errors.NONE.code) {
+      mergedResponseStatus.foreach { case (topicPartition, status) =>
+        if (status.errorCode != Errors.NONE.code) {
           errorInResponse = true
           debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
-            produceRequest.correlationId,
-            produceRequest.clientId,
-            topicAndPartition,
-            Errors.forCode(status.error).exceptionName))
+            request.header.correlationId,
+            request.header.clientId,
+            topicPartition,
+            Errors.forCode(status.errorCode).exceptionName))
         }
       }
 
       def produceResponseCallback(delayTimeMs: Int) {
-
-        if (produceRequest.requiredAcks == 0) {
+        if (produceRequest.acks == 0) {
           // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
           // the request, since no response is expected by the producer, the server will close socket server so that
           // the producer client will know that some error has happened and will refresh its metadata
           if (errorInResponse) {
-            val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) =>
-              topicAndPartition -> Errors.forCode(status.error).exceptionName
+            val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
+              topicPartition -> Errors.forCode(status.errorCode).exceptionName
             }.mkString(", ")
             info(
-              s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " +
-                s"from client id ${produceRequest.clientId} with ack=0\n" +
+              s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
+                s"from client id ${request.header.clientId} with ack=0\n" +
                 s"Topic and partition to exceptions: $exceptionsSummary"
             )
             requestChannel.closeConnection(request.processor, request)
@@ -360,41 +359,51 @@ class KafkaApis(val requestChannel: RequestChannel,
             requestChannel.noOperation(request.processor, request)
           }
         } else {
-          val response = ProducerResponse(produceRequest.correlationId,
-                                          mergedResponseStatus,
-                                          produceRequest.versionId,
-                                          delayTimeMs)
-          requestChannel.sendResponse(new RequestChannel.Response(request,
-                                                                  new RequestOrResponseSend(request.connectionId,
-                                                                                            response)))
+          val respHeader = new ResponseHeader(request.header.correlationId)
+          val respBody = request.header.apiVersion match {
+            case 0 => new ProduceResponse(mergedResponseStatus.asJava)
+            case 1 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs)
+            // This case shouldn't happen unless a new version of ProducerRequest is added without
+            // updating this part of the code to handle it properly.
+            case _ => throw new IllegalArgumentException("Version %d of ProducerRequest is not handled. Code must be updated."
+              .format(request.header.apiVersion))
+          }
+
+          requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))
         }
       }
 
       // When this callback is triggered, the remote API call has completed
       request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
 
-      quotaManagers(ApiKeys.PRODUCE.id).recordAndMaybeThrottle(produceRequest.clientId,
-                                                                   numBytesAppended,
-                                                                   produceResponseCallback)
+      quotaManagers(ApiKeys.PRODUCE.id).recordAndMaybeThrottle(
+        request.header.clientId,
+        numBytesAppended,
+        produceResponseCallback)
     }
 
     if (authorizedRequestInfo.isEmpty)
       sendResponseCallback(Map.empty)
     else {
-      val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
+      val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
+
+      // Convert ByteBuffer to ByteBufferMessageSet
+      val authorizedMessagesPerPartition = authorizedRequestInfo.map {
+        case (topicPartition, buffer) => (topicPartition, new ByteBufferMessageSet(buffer))
+      }
 
       // call the replica manager to append messages to the replicas
       replicaManager.appendMessages(
-        produceRequest.ackTimeoutMs.toLong,
-        produceRequest.requiredAcks,
+        produceRequest.timeout.toLong,
+        produceRequest.acks,
         internalTopicsAllowed,
-        authorizedRequestInfo,
+        authorizedMessagesPerPartition,
         sendResponseCallback)
 
       // if the request is put into the purgatory, it will have a held reference
       // and hence cannot be garbage collected; hence we clear its data here in
       // order to let GC re-claim its memory since it is already appended to log
-      produceRequest.emptyData()
+      produceRequest.clearPartitionRecords()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d1e549d..0ffb0e3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.apache.kafka.common.requests.StopReplicaRequest
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time => JTime}
 
 import scala.collection._
@@ -320,19 +321,19 @@ class ReplicaManager(val config: KafkaConfig,
   def appendMessages(timeout: Long,
                      requiredAcks: Short,
                      internalTopicsAllowed: Boolean,
-                     messagesPerPartition: Map[TopicAndPartition, MessageSet],
-                     responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) {
+                     messagesPerPartition: Map[TopicPartition, MessageSet],
+                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
 
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = SystemTime.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
       debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
-      val produceStatus = localProduceResults.map { case (topicAndPartition, result) =>
-        topicAndPartition ->
+      val produceStatus = localProduceResults.map { case (topicPartition, result) =>
+        topicPartition ->
                 ProducePartitionStatus(
                   result.info.lastOffset + 1, // required offset
-                  ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status
+                  new PartitionResponse(result.errorCode, result.info.firstOffset)) // response status
       }
 
       if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {
@@ -359,7 +360,7 @@ class ReplicaManager(val config: KafkaConfig,
       val responseStatus = messagesPerPartition.map {
         case (topicAndPartition, messageSet) =>
           (topicAndPartition ->
-                  ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code,
+                  new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
                     LogAppendInfo.UnknownLogAppendInfo.firstOffset))
       }
       responseCallback(responseStatus)
@@ -371,8 +372,8 @@ class ReplicaManager(val config: KafkaConfig,
   // 1. required acks = -1
   // 2. there is data to append
   // 3. at least one partition append was successful (fewer errors than partitions)
-  private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicAndPartition, MessageSet],
-                                       localProduceResults: Map[TopicAndPartition, LogAppendResult]): Boolean = {
+  private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicPartition, MessageSet],
+                                       localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
     requiredAcks == -1 &&
     messagesPerPartition.size > 0 &&
     localProduceResults.values.count(_.error.isDefined) < messagesPerPartition.size
@@ -386,26 +387,26 @@ class ReplicaManager(val config: KafkaConfig,
    * Append the messages to the local replica logs
    */
   private def appendToLocalLog(internalTopicsAllowed: Boolean,
-                               messagesPerPartition: Map[TopicAndPartition, MessageSet],
-                               requiredAcks: Short): Map[TopicAndPartition, LogAppendResult] = {
+                               messagesPerPartition: Map[TopicPartition, MessageSet],
+                               requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
     trace("Append [%s] to local log ".format(messagesPerPartition))
-    messagesPerPartition.map { case (topicAndPartition, messages) =>
-      BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).totalProduceRequestRate.mark()
+    messagesPerPartition.map { case (topicPartition, messages) =>
+      BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark()
       BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
 
       // reject appending to internal topics if it is not allowed
-      if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed) {
-        (topicAndPartition, LogAppendResult(
+      if (Topic.InternalTopics.contains(topicPartition.topic) && !internalTopicsAllowed) {
+        (topicPartition, LogAppendResult(
           LogAppendInfo.UnknownLogAppendInfo,
-          Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic)))))
+          Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicPartition.topic)))))
       } else {
         try {
-          val partitionOpt = getPartition(topicAndPartition.topic, topicAndPartition.partition)
+          val partitionOpt = getPartition(topicPartition.topic, topicPartition.partition)
           val info = partitionOpt match {
             case Some(partition) =>
               partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks)
             case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
-              .format(topicAndPartition, localBrokerId))
+              .format(topicPartition, localBrokerId))
           }
 
           val numAppendedMessages =
@@ -415,36 +416,36 @@ class ReplicaManager(val config: KafkaConfig,
               info.lastOffset - info.firstOffset + 1
 
           // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
-          BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
+          BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(messages.sizeInBytes)
           BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
-          BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
+          BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
           BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
 
           trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
-            .format(messages.sizeInBytes, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
-          (topicAndPartition, LogAppendResult(info))
+            .format(messages.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
+          (topicPartition, LogAppendResult(info))
         } catch {
           // NOTE: Failed produce requests metric is not incremented for known exceptions
           // it is supposed to indicate un-expected failures of a broker in handling a produce request
           case e: KafkaStorageException =>
             fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
             Runtime.getRuntime.halt(1)
-            (topicAndPartition, null)
+            (topicPartition, null)
           case utpe: UnknownTopicOrPartitionException =>
-            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe)))
+            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe)))
           case nle: NotLeaderForPartitionException =>
-            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle)))
+            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle)))
           case mtle: RecordTooLargeException =>
-            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle)))
+            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle)))
           case mstle: RecordBatchTooLargeException =>
-            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle)))
+            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle)))
           case imse: CorruptRecordException =>
-            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
+            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
           case t: Throwable =>
-            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
+            BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark()
             BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
-            error("Error processing append operation on partition %s".format(topicAndPartition), t)
-            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))
+            error("Error processing append operation on partition %s".format(topicPartition), t)
+            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 0f702a0..7e6e765 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -17,16 +17,16 @@
 
 package kafka.coordinator
 
-
 import org.junit.Assert._
 
-import kafka.api.ProducerResponseStatus
 import kafka.common.{OffsetAndMetadata, TopicAndPartition}
 import kafka.message.MessageSet
 import kafka.server.{ReplicaManager, KafkaConfig}
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.{Capture, IAnswer, EasyMock}
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -824,16 +824,16 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
                                   assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams] = {
     val (responseFuture, responseCallback) = setupSyncGroupCallback
 
-    val capturedArgument: Capture[Map[TopicAndPartition, ProducerResponseStatus] => Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.anyBoolean(),
-      EasyMock.anyObject().asInstanceOf[Map[TopicAndPartition, MessageSet]],
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
-          new ProducerResponseStatus(Errors.NONE.code, 0L)
+        Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
+          new PartitionResponse(Errors.NONE.code, 0L)
         )
       )})
     EasyMock.replay(replicaManager)
@@ -900,16 +900,16 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
                             offsets: immutable.Map[TopicAndPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
-    val capturedArgument: Capture[Map[TopicAndPartition, ProducerResponseStatus] => Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.anyBoolean(),
-      EasyMock.anyObject().asInstanceOf[Map[TopicAndPartition, MessageSet]],
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
-          new ProducerResponseStatus(Errors.NONE.code, 0L)
+        Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
+          new PartitionResponse(Errors.NONE.code, 0L)
         )
       )})
     EasyMock.replay(replicaManager)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 01f198e..b4ba027 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -21,24 +21,26 @@ package kafka.network;
 import java.net._
 import javax.net.ssl._
 import java.io._
+import java.util.HashMap
+import java.util.Random
+import java.nio.ByteBuffer
 
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.NetworkSend
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader}
 import org.apache.kafka.common.utils.SystemTime
-import org.junit.Assert._
-import org.junit._
-import org.scalatest.junit.JUnitSuite
-import java.util.Random
+
 import kafka.producer.SyncProducerConfig
-import kafka.api.ProducerRequest
-import java.nio.ByteBuffer
-import kafka.common.TopicAndPartition
-import kafka.message.ByteBufferMessageSet
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 
+import org.junit.Assert._
+import org.junit._
+import org.scalatest.junit.JUnitSuite
+
 import scala.collection.Map
 
 class SocketServerTest extends JUnitSuite {
@@ -56,10 +58,15 @@ class SocketServerTest extends JUnitSuite {
   val server = new SocketServer(config, metrics, new SystemTime)
   server.startup()
 
-  def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
+  def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None) {
     val outgoing = new DataOutputStream(socket.getOutputStream)
-    outgoing.writeInt(request.length + 2)
-    outgoing.writeShort(id)
+    id match {
+      case Some(id) =>
+        outgoing.writeInt(request.length + 2)
+        outgoing.writeShort(id)
+      case None =>
+        outgoing.writeInt(request.length)
+    }
     outgoing.write(request)
     outgoing.flush()
   }
@@ -75,9 +82,11 @@ class SocketServerTest extends JUnitSuite {
   /* A simple request handler that just echos back the response */
   def processRequest(channel: RequestChannel) {
     val request = channel.receiveRequest
-    val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes)
-    request.requestObj.writeTo(byteBuffer)
+    val byteBuffer = ByteBuffer.allocate(request.header.sizeOf + request.body.sizeOf)
+    request.header.writeTo(byteBuffer)
+    request.body.writeTo(byteBuffer)
     byteBuffer.rewind()
+
     val send = new NetworkSend(request.connectionId, byteBuffer)
     channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
   }
@@ -92,14 +101,17 @@ class SocketServerTest extends JUnitSuite {
   }
 
   private def producerRequestBytes: Array[Byte] = {
+    val apiKey: Short = 0
     val correlationId = -1
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
-    val emptyRequest =
-      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
 
-    val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
+    val emptyHeader = new RequestHeader(apiKey, clientId, correlationId)
+    val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]())
+
+    val byteBuffer = ByteBuffer.allocate(emptyHeader.sizeOf + emptyRequest.sizeOf)
+    emptyHeader.writeTo(byteBuffer)
     emptyRequest.writeTo(byteBuffer)
     byteBuffer.rewind()
     val serializedBytes = new Array[Byte](byteBuffer.remaining)
@@ -114,12 +126,12 @@ class SocketServerTest extends JUnitSuite {
     val serializedBytes = producerRequestBytes
 
     // Test PLAINTEXT socket
-    sendRequest(plainSocket, 0, serializedBytes)
+    sendRequest(plainSocket, serializedBytes)
     processRequest(server.requestChannel)
     assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq)
 
     // Test TRACE socket
-    sendRequest(traceSocket, 0, serializedBytes)
+    sendRequest(traceSocket, serializedBytes)
     processRequest(server.requestChannel)
     assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq)
   }
@@ -129,7 +141,7 @@ class SocketServerTest extends JUnitSuite {
     val tooManyBytes = new Array[Byte](server.config.socketRequestMaxBytes + 1)
     new Random().nextBytes(tooManyBytes)
     val socket = connect()
-    sendRequest(socket, 0, tooManyBytes)
+    sendRequest(socket, tooManyBytes, Some(0))
     try {
       receiveResponse(socket)
     } catch {
@@ -144,8 +156,8 @@ class SocketServerTest extends JUnitSuite {
     val traceSocket = connect(protocol = SecurityProtocol.TRACE)
     val bytes = new Array[Byte](40)
     // send a request first to make sure the connection has been picked up by the socket server
-    sendRequest(plainSocket, 0, bytes)
-    sendRequest(traceSocket, 0, bytes)
+    sendRequest(plainSocket, bytes, Some(0))
+    sendRequest(traceSocket, bytes, Some(0))
     processRequest(server.requestChannel)
 
     // make sure the sockets are open
@@ -157,14 +169,14 @@ class SocketServerTest extends JUnitSuite {
     // doing a subsequent send should throw an exception as the connection should be closed.
     // send a large chunk of bytes to trigger a socket flush
     try {
-      sendRequest(plainSocket, 0, largeChunkOfBytes)
+      sendRequest(plainSocket, largeChunkOfBytes, Some(0))
       fail("expected exception when writing to closed plain socket")
     } catch {
       case e: IOException => // expected
     }
 
     try {
-      sendRequest(traceSocket, 0, largeChunkOfBytes)
+      sendRequest(traceSocket, largeChunkOfBytes, Some(0))
       fail("expected exception when writing to closed trace socket")
     } catch {
       case e: IOException => // expected
@@ -188,7 +200,7 @@ class SocketServerTest extends JUnitSuite {
       "Failed to decrement connection count after close")
     val conn2 = connect()
     val serializedBytes = producerRequestBytes
-    sendRequest(conn2, 0, serializedBytes)
+    sendRequest(conn2, serializedBytes)
     val request = server.requestChannel.receiveRequest(2000)
     assertNotNull(request)
     conn2.close()
@@ -235,20 +247,22 @@ class SocketServerTest extends JUnitSuite {
       val sslSocket = socketFactory.createSocket("localhost", overrideServer.boundPort(SecurityProtocol.SSL)).asInstanceOf[SSLSocket]
       sslSocket.setNeedClientAuth(false)
 
+      val apiKey = ApiKeys.PRODUCE.id
       val correlationId = -1
       val clientId = SyncProducerConfig.DefaultClientId
       val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
       val ack = SyncProducerConfig.DefaultRequiredAcks
-      val emptyRequest =
-        new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
+      val emptyHeader = new RequestHeader(apiKey, clientId, correlationId)
+      val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]())
 
-      val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
+      val byteBuffer = ByteBuffer.allocate(emptyHeader.sizeOf() + emptyRequest.sizeOf())
+      emptyHeader.writeTo(byteBuffer)
       emptyRequest.writeTo(byteBuffer)
       byteBuffer.rewind()
       val serializedBytes = new Array[Byte](byteBuffer.remaining)
       byteBuffer.get(serializedBytes)
 
-      sendRequest(sslSocket, 0, serializedBytes)
+      sendRequest(sslSocket, serializedBytes)
       processRequest(overrideServer.requestChannel)
       assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
       sslSocket.close()
@@ -262,7 +276,7 @@ class SocketServerTest extends JUnitSuite {
   def testSessionPrincipal(): Unit = {
     val socket = connect()
     val bytes = new Array[Byte](40)
-    sendRequest(socket, 0, bytes)
+    sendRequest(socket, bytes, Some(0))
     assertEquals(KafkaPrincipal.ANONYMOUS, server.requestChannel.receiveRequest().session.principal)
     socket.close()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79cda047/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 724d4ac..32085f6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -17,21 +17,26 @@
 
 package kafka.server
 
-import kafka.api.{ProducerResponseStatus, SerializationTestUtils, ProducerRequest}
-import kafka.common.TopicAndPartition
+
+import kafka.api.SerializationTestUtils
+import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.utils.{ZkUtils, MockScheduler, MockTime, TestUtils}
+import org.apache.kafka.common.requests.ProduceRequest
 
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.File
 
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.junit.Test
 
 import scala.collection.Map
+import scala.collection.JavaConverters._
 
 class ReplicaManagerTest {
 
@@ -97,11 +102,15 @@ class ReplicaManagerTest {
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), Option(this.getClass.getName))
     try {
-      val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest)
-      def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = {
-        assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code)
+      def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
+        assert(responseStatus.values.head.errorCode == Errors.INVALID_REQUIRED_ACKS.code)
       }
-      rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback)
+      rm.appendMessages(
+        timeout = 0,
+        requiredAcks = 3,
+        internalTopicsAllowed = false,
+        messagesPerPartition = Map(new TopicPartition("test1", 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))),
+        responseCallback = callback)
     } finally {
       rm.shutdown(false)
       metrics.close()


Mime
View raw message