kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/5] kafka git commit: KAFKA-2066; Use client-side FetchRequest/FetchResponse on server
Date Tue, 15 Nov 2016 00:46:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1f1d45006 -> 3b4c34794


http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c6c8dbd..94ae419 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -19,13 +19,13 @@ package kafka.server
 
 import java.nio.ByteBuffer
 import java.lang.{Long => JLong, Short => JShort}
+import java.util
 import java.util.Properties
 
 import kafka.admin.{AdminUtils, RackAwareMode}
-import kafka.api._
+import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse, FetchResponsePartitionData}
 import kafka.cluster.Partition
-import kafka.server.QuotaFactory.{UnboundedQuota, QuotaManagers}
-import kafka.common
+import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.common._
 import kafka.controller.KafkaController
 import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
@@ -34,12 +34,13 @@ import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
 import kafka.security.auth
-import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Write, Delete}
+import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Write}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
-import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Node, TopicPartition}
@@ -105,14 +106,13 @@ class KafkaApis(val requestChannel: RequestChannel,
           error("Error when handling request %s".format(request.requestObj), e)
         } else {
           val response = request.body.getErrorResponse(request.header.apiVersion, e)
-          val respHeader = new ResponseHeader(request.header.correlationId)
 
           /* If request doesn't have a default error response, we just close the connection.
              For example, when produce request has acks set to 0 */
           if (response == null)
             requestChannel.closeConnection(request.processor, request)
           else
-            requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
+            requestChannel.sendResponse(new Response(request, response))
 
           error("Error when handling request %s".format(request.body), e)
         }
@@ -142,7 +142,6 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      val responseHeader = new ResponseHeader(correlationId)
       val leaderAndIsrResponse =
         if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
           val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
@@ -152,7 +151,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
         }
 
-      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, leaderAndIsrResponse)))
+      requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
     } catch {
       case e: KafkaStorageException =>
         fatal("Disk error during leadership change.", e)
@@ -166,7 +165,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     // stop serving data to clients for the topic being deleted
     val stopReplicaRequest = request.body.asInstanceOf[StopReplicaRequest]
 
-    val responseHeader = new ResponseHeader(request.header.correlationId)
     val response =
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
         val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
@@ -176,7 +174,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
       }
 
-    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, response)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, response))
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
   }
 
@@ -197,8 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code)
       }
 
-    val responseHeader = new ResponseHeader(correlationId)
-    requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, updateMetadataResponse)))
+    requestChannel.sendResponse(new Response(request, updateMetadataResponse))
   }
 
   def handleControlledShutdownRequest(request: RequestChannel.Request) {
@@ -228,9 +225,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
         (topicPartition, errorCode)
       }.toMap
-      val responseHeader = new ResponseHeader(header.correlationId)
-      val responseBody = new OffsetCommitResponse(results.asJava)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+      val response = new OffsetCommitResponse(results.asJava)
+      requestChannel.sendResponse(new RequestChannel.Response(request, response))
     } else {
       val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
         case (topicPartition, _) => {
@@ -260,9 +256,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
             }
           }
-        val responseHeader = new ResponseHeader(header.correlationId)
-        val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava)
-        requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+        val response = new OffsetCommitResponse(combinedCommitStatus.asJava)
+        requestChannel.sendResponse(new RequestChannel.Response(request, response))
       }
 
       if (authorizedTopics.isEmpty)
@@ -330,7 +325,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def authorize(session: Session, operation: Operation, resource: Resource): Boolean =
-    authorizer.map(_.authorize(session, operation, resource)).getOrElse(true)
+    authorizer.forall(_.authorize(session, operation, resource))
 
   /**
    * Handle a produce request
@@ -388,7 +383,6 @@ class KafkaApis(val requestChannel: RequestChannel,
             requestChannel.noOperation(request.processor, request)
           }
         } else {
-          val respHeader = new ResponseHeader(request.header.correlationId)
           val respBody = request.header.apiVersion match {
             case 0 => new ProduceResponse(mergedResponseStatus.asJava)
             case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
@@ -397,7 +391,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
           }
 
-          requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))
+          requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
         }
       }
 
@@ -416,9 +410,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     else {
       val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
 
-      // Convert ByteBuffer to ByteBufferMessageSet
+      // Convert Records to ByteBufferMessageSet
       val authorizedMessagesPerPartition = authorizedRequestInfo.map {
-        case (topicPartition, buffer) => (topicPartition, new ByteBufferMessageSet(buffer))
+        case (topicPartition, records) => (topicPartition, new ByteBufferMessageSet(records.buffer))
       }
 
       // call the replica manager to append messages to the replicas
@@ -440,79 +434,85 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle a fetch request
    */
   def handleFetchRequest(request: RequestChannel.Request) {
-    val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
+    val fetchRequest = request.body.asInstanceOf[FetchRequest]
+    val versionId = request.header.apiVersion
+    val clientId = request.header.clientId
 
-    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.requestInfo.partition {
-      case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicAndPartition.topic)) && metadataCache.contains(topicAndPartition.topic)
+    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.fetchData.asScala.toSeq.partition {
+      case (tp, _) => authorize(request.session, Describe, new Resource(auth.Topic, tp.topic)) && metadataCache.contains(tp.topic)
     }
 
     val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
-      case (topicAndPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicAndPartition.topic))
+      case (tp, _) => authorize(request.session, Read, new Resource(auth.Topic, tp.topic))
     }
 
-    val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map { case (tp, _) =>
-      (tp, FetchResponsePartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, MessageSet.Empty))
+    val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
+      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, MemoryRecords.EMPTY))
     }
-    val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map { case (tp, _) =>
-      (tp, FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty))
+
+    val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
+      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MemoryRecords.EMPTY))
     }
 
     // the callback for sending a fetch response
     def sendResponseCallback(responsePartitionData: Seq[(TopicAndPartition, FetchResponsePartitionData)]) {
-
-      val convertedPartitionData =
+      val convertedPartitionData = {
         // Need to down-convert message when consumer only takes magic value 0.
-        if (fetchRequest.versionId <= 1) {
-          responsePartitionData.map { case (tp, data) =>
-
-            // We only do down-conversion when:
-            // 1. The message format version configured for the topic is using magic value > 0, and
-            // 2. The message set contains message whose magic > 0
-            // This is to reduce the message format conversion as much as possible. The conversion will only occur
-            // when new message format is used for the topic and we see an old request.
-            // Please note that if the message format is changed from a higher version back to lower version this
-            // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
-            // without format down conversion.
-            val convertedData = if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
-              !data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) {
-              trace(s"Down converting message to V0 for fetch request from ${fetchRequest.clientId}")
-              new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
-            } else data
-
-            tp -> convertedData
-          }
-        } else responsePartitionData
+        responsePartitionData.map { case (tp, data) =>
+
+          // We only do down-conversion when:
+          // 1. The message format version configured for the topic is using magic value > 0, and
+          // 2. The message set contains message whose magic > 0
+          // This is to reduce the message format conversion as much as possible. The conversion will only occur
+          // when new message format is used for the topic and we see an old request.
+          // Please note that if the message format is changed from a higher version back to lower version this
+          // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
+          // without format down conversion.
+          val convertedData = if (versionId <= 1 && replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
+            !data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) {
+            trace(s"Down converting message to V0 for fetch request from $clientId")
+            new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
+          } else data
+
+          val records = convertedData.messages.asRecords
+          new TopicPartition(tp.topic, tp.partition) -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, records)
+        }
+      }
 
       val mergedPartitionData = convertedPartitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData
+      val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
+
+      mergedPartitionData.foreach { case (topicPartition, data) =>
+        if (data.errorCode != Errors.NONE.code)
+          debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
+            s"on partition $topicPartition failed due to ${Errors.forCode(data.errorCode).exceptionName}")
+
+        fetchedPartitionData.put(topicPartition, data)
 
-      mergedPartitionData.foreach { case (topicAndPartition, data) =>
-        if (data.error != Errors.NONE.code)
-          debug(s"Fetch request with correlation id ${fetchRequest.correlationId} from client ${fetchRequest.clientId} " +
-            s"on partition $topicAndPartition failed due to ${Errors.forCode(data.error).exceptionName}")
         // record the bytes out metrics only when the response is being sent
-        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
-        BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
+        BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesOutRate.mark(data.records.sizeInBytes)
+        BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.records.sizeInBytes)
       }
 
+      val response = new FetchResponse(versionId, fetchedPartitionData, 0)
+
       def fetchResponseCallback(delayTimeMs: Int) {
-        trace(s"Sending fetch response to client ${fetchRequest.clientId} of " +
-          s"${convertedPartitionData.map { case (_, v) => v.messages.sizeInBytes }.sum} bytes")
-        val response = FetchResponse(fetchRequest.correlationId, mergedPartitionData.toSeq, fetchRequest.versionId, delayTimeMs)
-        requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
+        trace(s"Sending fetch response to client $clientId of " +
+          s"${convertedPartitionData.map { case (_, v) => v.records.sizeInBytes }.sum} bytes")
+        val fetchResponse = if (delayTimeMs > 0) new FetchResponse(versionId, fetchedPartitionData, delayTimeMs) else response
+        requestChannel.sendResponse(new RequestChannel.Response(request, fetchResponse))
       }
 
       // When this callback is triggered, the remote API call has completed
       request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
 
       if (fetchRequest.isFromFollower) {
-        //We've already evaluated against the quota and are good to go. Just need to record it now.
-        val responseSize = sizeOfThrottledPartitions(fetchRequest, mergedPartitionData, quotas.leader)
+        // We've already evaluated against the quota and are good to go. Just need to record it now.
+        val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader)
         quotas.leader.record(responseSize)
         fetchResponseCallback(0)
       } else {
-        val responseSize = FetchResponse.responseSize(FetchResponse.batchByTopic(mergedPartitionData),
-          fetchRequest.versionId)
-        quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, fetchRequest.clientId, responseSize, fetchResponseCallback)
+        quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, response.sizeOf, fetchResponseCallback)
       }
     }
 
@@ -525,18 +525,23 @@ class KafkaApis(val requestChannel: RequestChannel,
         fetchRequest.replicaId,
         fetchRequest.minBytes,
         fetchRequest.maxBytes,
-        fetchRequest.versionId <= 2,
+        versionId <= 2,
         authorizedRequestInfo,
         replicationQuota(fetchRequest),
         sendResponseCallback)
     }
   }
 
-  private def sizeOfThrottledPartitions(fetchRequest: FetchRequest,
-                                        mergedPartitionData: Seq[(TopicAndPartition, FetchResponsePartitionData)],
+  private def sizeOfThrottledPartitions(versionId: Short,
+                                        fetchRequest: FetchRequest,
+                                        mergedPartitionData: Seq[(TopicPartition, FetchResponse.PartitionData)],
                                         quota: ReplicationQuotaManager): Int = {
-    val throttledPartitions = mergedPartitionData.filter { case (partition, _) => quota.isThrottled(partition) }
-    FetchResponse.responseSize(FetchRequest.batchByTopic(throttledPartitions), fetchRequest.versionId)
+    val partitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
+    mergedPartitionData.foreach { case (tp, data) =>
+      if (quota.isThrottled(TopicAndPartition(tp.topic(), tp.partition())))
+        partitionData.put(tp, data)
+    }
+    FetchResponse.sizeOf(versionId, partitionData)
   }
 
   def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota =
@@ -546,7 +551,6 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle an offset request
    */
   def handleOffsetRequest(request: RequestChannel.Request) {
-    val correlationId = request.header.correlationId
     val version = request.header.apiVersion()
 
     val mergedResponseMap =
@@ -555,10 +559,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       else
         handleOffsetRequestV1(request)
 
-    val responseHeader = new ResponseHeader(correlationId)
     val response = new ListOffsetResponse(mergedResponseMap.asJava, version)
-
-    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, response)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, response))
   }
 
   private def handleOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = {
@@ -711,7 +713,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     else
       offsetTimeArray = new Array[(Long, Long)](segsArray.length)
 
-    for (i <- 0 until segsArray.length)
+    for (i <- segsArray.indices)
       offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
     if (lastSegmentHasSize)
       offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
@@ -837,7 +839,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic =>
-      new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic),
+      new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Topic.isInternal(topic),
         java.util.Collections.emptyList()))
 
     // do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
@@ -865,8 +867,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
       brokers.mkString(","), request.header.correlationId, request.header.clientId))
 
-    val responseHeader = new ResponseHeader(request.header.correlationId)
-
     val responseBody = new MetadataResponse(
       brokers.map(_.getNode(request.securityProtocol)).asJava,
       clusterId,
@@ -874,7 +874,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       completeTopicMetadata.asJava,
       requestVersion
     )
-    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
   }
 
   /*
@@ -884,7 +884,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     val header = request.header
     val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest]
 
-    val responseHeader = new ResponseHeader(header.correlationId)
     val offsetFetchResponse =
     // reject the request if not authorized to the group
     if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
@@ -933,16 +932,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
-    requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, offsetFetchResponse)))
+    requestChannel.sendResponse(new Response(request, offsetFetchResponse))
   }
 
   def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
     val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
-    val responseHeader = new ResponseHeader(request.header.correlationId)
 
     if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
       val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     } else {
       val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
 
@@ -966,16 +964,14 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       trace("Sending consumer metadata %s for correlation id %d to client %s."
         .format(responseBody, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     }
   }
 
   def handleDescribeGroupRequest(request: RequestChannel.Request) {
     val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest]
-    val responseHeader = new ResponseHeader(request.header.correlationId)
 
-    val groups = describeRequest.groupIds().asScala.map {
-      case groupId =>
+    val groups = describeRequest.groupIds().asScala.map { groupId =>
         if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
           groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
         } else {
@@ -991,11 +987,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     }.toMap
 
     val responseBody = new DescribeGroupsResponse(groups.asJava)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request) {
-    val responseHeader = new ResponseHeader(request.header.correlationId)
     val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
       ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
     } else {
@@ -1003,14 +998,13 @@ class KafkaApis(val requestChannel: RequestChannel,
       val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
       new ListGroupsResponse(error.code, allGroups.asJava)
     }
-    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
   }
 
   def handleJoinGroupRequest(request: RequestChannel.Request) {
     import JavaConversions._
 
     val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
-    val responseHeader = new ResponseHeader(request.header.correlationId)
 
     // the callback for sending a join-group response
     def sendResponseCallback(joinResult: JoinGroupResult) {
@@ -1020,7 +1014,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       trace("Sending join group response %s for correlation id %d to client %s."
         .format(responseBody, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     }
 
     if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
@@ -1032,7 +1026,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
         Map.empty[String, ByteBuffer])
-      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     } else {
       // let the coordinator to handle join-group
       val protocols = joinGroupRequest.groupProtocols().map(protocol =>
@@ -1057,8 +1051,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
       val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState))
-      val responseHeader = new ResponseHeader(request.header.correlationId)
-      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+      requestChannel.sendResponse(new Response(request, responseBody))
     }
 
     if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
@@ -1068,7 +1061,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         syncGroupRequest.groupId(),
         syncGroupRequest.generationId(),
         syncGroupRequest.memberId(),
-        syncGroupRequest.groupAssignment().mapValues(Utils.toArray(_)),
+        syncGroupRequest.groupAssignment().mapValues(Utils.toArray),
         sendResponseCallback
       )
     }
@@ -1076,19 +1069,18 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleHeartbeatRequest(request: RequestChannel.Request) {
     val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest]
-    val respHeader = new ResponseHeader(request.header.correlationId)
 
     // the callback for sending a heartbeat response
     def sendResponseCallback(errorCode: Short) {
       val response = new HeartbeatResponse(errorCode)
       trace("Sending heartbeat response %s for correlation id %d to client %s."
         .format(response, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, response))
     }
 
     if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
       val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
-      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
+      requestChannel.sendResponse(new Response(request, heartbeatResponse))
     }
     else {
       // let the coordinator to handle heartbeat
@@ -1102,19 +1094,18 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleLeaveGroupRequest(request: RequestChannel.Request) {
     val leaveGroupRequest = request.body.asInstanceOf[LeaveGroupRequest]
-    val respHeader = new ResponseHeader(request.header.correlationId)
 
     // the callback for sending a leave-group response
     def sendResponseCallback(errorCode: Short) {
       val response = new LeaveGroupResponse(errorCode)
       trace("Sending leave group response %s for correlation id %d to client %s."
                     .format(response, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, response))
     }
 
     if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
       val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
-      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse)))
+      requestChannel.sendResponse(new Response(request, leaveGroupResponse))
     } else {
       // let the coordinator to handle leave-group
       coordinator.handleLeaveGroup(
@@ -1125,9 +1116,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleSaslHandshakeRequest(request: RequestChannel.Request) {
-    val respHeader = new ResponseHeader(request.header.correlationId)
     val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE.code, config.saslEnabledMechanisms)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, response))
   }
 
   def handleApiVersionsRequest(request: RequestChannel.Request) {
@@ -1137,12 +1127,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     // If this is considered to leak information about the broker version a workaround is to use SSL
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
-    val responseHeader = new ResponseHeader(request.header.correlationId)
     val responseBody = if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
       ApiVersionsResponse.apiVersionsResponse
     else
       ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
   }
 
   def close() {
@@ -1154,10 +1143,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     val createTopicsRequest = request.body.asInstanceOf[CreateTopicsRequest]
 
     def sendResponseCallback(results: Map[String, Errors]): Unit = {
-      val respHeader = new ResponseHeader(request.header.correlationId)
       val responseBody = new CreateTopicsResponse(results.asJava)
       trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
-      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     }
 
     if (!controller.isActive()) {
@@ -1205,14 +1193,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(results: Map[String, Errors]): Unit = {
       val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
           unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
-      val respHeader = new ResponseHeader(request.header.correlationId)
       val responseBody = new DeleteTopicsResponse(completeResults.asJava)
       trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
-      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     }
 
     if (!controller.isActive()) {
-      val results = deleteTopicRequest.topics.asScala.map { case topic =>
+      val results = deleteTopicRequest.topics.asScala.map { topic =>
         (topic, Errors.NOT_CONTROLLER)
       }.toMap
       sendResponseCallback(results)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1c7f06a..4026a7e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -17,45 +17,37 @@
 
 package kafka.server
 
+import java.io.{File, IOException}
 import java.net.SocketTimeoutException
 import java.util
-
-import kafka.admin._
-import kafka.api.KAFKA_0_9_0
-import kafka.log.LogConfig
-import kafka.log.CleanerConfig
-import kafka.log.LogManager
 import java.util.concurrent._
-import atomic.{AtomicBoolean, AtomicInteger}
-import java.io.{File, IOException}
-import java.nio.charset.StandardCharsets
-import java.util.UUID
-import javax.xml.bind.DatatypeConverter
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
+import com.yammer.metrics.core.Gauge
+import kafka.admin.AdminUtils
+import kafka.api.KAFKA_0_9_0
+import kafka.cluster.{Broker, EndPoint}
+import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
+import kafka.controller.{ControllerStats, KafkaController}
+import kafka.coordinator.GroupCoordinator
+import kafka.log.{CleanerConfig, LogConfig, LogManager}
+import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
+import kafka.network.{BlockingChannel, SocketServer}
 import kafka.security.auth.Authorizer
 import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.clients.{ClientRequest, ManualMetadataUpdater, NetworkClient}
-import org.apache.kafka.common.{ClusterResource, Node}
-import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
+import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
-import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse, RequestSend}
+import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.AppInfoParser
+import org.apache.kafka.common.{ClusterResource, Node}
 
-import scala.collection.{Map, mutable}
 import scala.collection.JavaConverters._
-import org.I0Itec.zkclient.ZkClient
-import kafka.controller.{ControllerStats, KafkaController}
-import kafka.cluster.{Broker, EndPoint}
-import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
-import kafka.network.{BlockingChannel, SocketServer}
-import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
-import com.yammer.metrics.core.Gauge
-import kafka.coordinator.GroupCoordinator
-import org.apache.kafka.common.internals.ClusterResourceListeners
-import collection.JavaConverters._
+import scala.collection.{Map, mutable}
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -433,12 +425,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
               // send the controlled shutdown request
               val requestHeader = networkClient.nextRequestHeader(ApiKeys.CONTROLLED_SHUTDOWN_KEY)
-              val send = new RequestSend(node(prevController).idString, requestHeader,
-                new ControlledShutdownRequest(config.brokerId).toStruct)
-              val request = new ClientRequest(kafkaMetricsTime.milliseconds(), true, send, null)
-              val clientResponse = networkClient.blockingSendAndReceive(request)
+              val controlledShutdownRequest = new ControlledShutdownRequest(config.brokerId)
+              val request = new ClientRequest(node(prevController).idString, kafkaMetricsTime.milliseconds(), true,
+                requestHeader, controlledShutdownRequest, null)
+              val clientResponse = networkClient.blockingSendAndReceive(request, controlledShutdownRequest)
 
-              val shutdownResponse = new ControlledShutdownResponse(clientResponse.responseBody)
+              val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
               if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty) {
                 shutdownSucceeded = true
                 info("Controlled shutdown succeeded")
@@ -707,7 +699,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
     for(logDir <- logDirsWithoutMetaProps) {
       val checkpoint = brokerMetadataCheckpoints(logDir)
-      checkpoint.write(new BrokerMetadata(brokerId))
+      checkpoint.write(BrokerMetadata(brokerId))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 6f4c589..84c3feb 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -29,15 +29,16 @@ import kafka.common.{KafkaStorageException, TopicAndPartition}
 import ReplicaFetcherThread._
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
-import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse, RequestSend}
+import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
 import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.utils.Time
 
-import scala.collection.{JavaConverters, Map}
-import JavaConverters._
+import scala.collection.Map
+import scala.collection.JavaConverters._
 
 class ReplicaFetcherThread(name: String,
                            fetcherId: Int,
@@ -139,7 +140,7 @@ class ReplicaFetcherThread(name: String,
       if (logger.isTraceEnabled)
         trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
           .format(replica.brokerId, topic, partitionId, followerHighWatermark))
-      if (quota.isThrottled(new TopicAndPartition(topic, partitionId)))
+      if (quota.isThrottled(TopicAndPartition(topic, partitionId)))
         quota.record(messageSet.sizeInBytes)
     } catch {
       case e: KafkaStorageException =>
@@ -236,7 +237,8 @@ class ReplicaFetcherThread(name: String,
 
   protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
     val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying)
-    new FetchResponse(clientResponse.responseBody).responseData.asScala.toSeq.map { case (key, value) =>
+    val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
+    fetchResponse.responseData.asScala.toSeq.map { case (key, value) =>
       key -> new PartitionData(value)
     }
   }
@@ -248,9 +250,8 @@ class ReplicaFetcherThread(name: String,
       if (!networkClient.blockingReady(sourceNode, socketTimeout)(time))
         throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
       else {
-        val send = new RequestSend(sourceBroker.id.toString, header, request.toStruct)
-        val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
-        networkClient.blockingSendAndReceive(clientRequest)(time)
+        val clientRequest = new ClientRequest(sourceBroker.id.toString, time.milliseconds(), true, header, request, null)
+        networkClient.blockingSendAndReceive(clientRequest, request)(time)
       }
     }
     catch {
@@ -271,7 +272,7 @@ class ReplicaFetcherThread(name: String,
         (new ListOffsetRequest(consumerId, partitions.asJava), 0)
       }
     val clientResponse = sendRequest(ApiKeys.LIST_OFFSETS, Some(apiVersion.toShort), request)
-    val response = new ListOffsetResponse(clientResponse.responseBody)
+    val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse]
     val partitionData = response.responseData.get(topicPartition)
     Errors.forCode(partitionData.errorCode) match {
       case Errors.NONE =>
@@ -322,7 +323,10 @@ object ReplicaFetcherThread {
 
     def errorCode: Short = underlying.errorCode
 
-    def toByteBufferMessageSet: ByteBufferMessageSet = new ByteBufferMessageSet(underlying.recordSet)
+    def toByteBufferMessageSet: ByteBufferMessageSet = {
+      val buffer = underlying.records.asInstanceOf[MemoryRecords].buffer
+      new ByteBufferMessageSet(buffer)
+    }
 
     def highWatermark: Long = underlying.highWatermark
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b43695a..febe8ad 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.utils.{Time => JTime}
 
 import scala.collection._
@@ -457,7 +458,7 @@ class ReplicaManager(val config: KafkaConfig,
                     fetchMinBytes: Int,
                     fetchMaxBytes: Int,
                     hardMaxBytesLimit: Boolean,
-                    fetchInfos: Seq[(TopicAndPartition, PartitionFetchInfo)],
+                    fetchInfos: Seq[(TopicPartition, PartitionData)],
                     quota: ReplicaQuota = UnboundedQuota,
                     responseCallback: Seq[(TopicAndPartition, FetchResponsePartitionData)] => Unit) {
     val isFromFollower = replicaId >= 0
@@ -498,7 +499,7 @@ class ReplicaManager(val config: KafkaConfig,
       // construct the fetch results from the read results
       val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) =>
         val fetchInfo = fetchInfos.collectFirst {
-          case (tp, v) if tp == topicAndPartition => v
+          case (tp, v) if TopicAndPartition(tp.topic, tp.partition) == topicAndPartition => v
         }.getOrElse(sys.error(s"Partition $topicAndPartition not found in fetchInfos"))
         (topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
       }
@@ -524,19 +525,21 @@ class ReplicaManager(val config: KafkaConfig,
                        readOnlyCommitted: Boolean,
                        fetchMaxBytes: Int,
                        hardMaxBytesLimit: Boolean,
-                       readPartitionInfo: Seq[(TopicAndPartition, PartitionFetchInfo)],
+                       readPartitionInfo: Seq[(TopicPartition, PartitionData)],
                        quota: ReplicaQuota): Seq[(TopicAndPartition, LogReadResult)] = {
 
-    def read(tp: TopicAndPartition, fetchInfo: PartitionFetchInfo, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
-      val TopicAndPartition(topic, partition) = tp
-      val PartitionFetchInfo(offset, fetchSize) = fetchInfo
+    def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
+      val topic = tp.topic
+      val partition = tp.partition
+      val offset = fetchInfo.offset
+      val partitionFetchSize = fetchInfo.maxBytes
 
       BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark()
       BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
 
       try {
-        trace(s"Fetching log segment for partition $tp, offset ${offset}, partition fetch size ${fetchSize}, " +
-          s"remaining response limit ${limitBytes}" +
+        trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
+          s"remaining response limit $limitBytes" +
           (if (minOneMessage) s", ignoring response/partition size limits" else ""))
 
         // decide whether to only fetch from leader
@@ -560,13 +563,13 @@ class ReplicaManager(val config: KafkaConfig,
         val initialLogEndOffset = localReplica.logEndOffset
         val logReadInfo = localReplica.log match {
           case Some(log) =>
-            val adjustedFetchSize = math.min(fetchSize, limitBytes)
+            val adjustedFetchSize = math.min(partitionFetchSize, limitBytes)
 
             // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
             val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage)
 
             // If the partition is being throttled, simply return an empty set.
-            if (shouldLeaderThrottle(quota, tp, replicaId))
+            if (shouldLeaderThrottle(quota, TopicAndPartition(tp.topic, tp.partition), replicaId))
               FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty)
             // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
             // progress in such cases and don't need to report a `RecordTooLargeException`
@@ -581,7 +584,7 @@ class ReplicaManager(val config: KafkaConfig,
 
         val readToEndOfLog = initialLogEndOffset.messageOffset - logReadInfo.fetchOffsetMetadata.messageOffset <= 0
 
-        LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, readToEndOfLog, None)
+        LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, partitionFetchSize, readToEndOfLog, None)
       } catch {
         // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
         // is supposed to indicate un-expected failure of a broker in handling a fetch request
@@ -589,12 +592,12 @@ class ReplicaManager(val config: KafkaConfig,
                  _: NotLeaderForPartitionException |
                  _: ReplicaNotAvailableException |
                  _: OffsetOutOfRangeException) =>
-          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(e))
+          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, partitionFetchSize, false, Some(e))
         case e: Throwable =>
           BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
           BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
           error(s"Error processing fetch operation on partition ${tp}, offset $offset", e)
-          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(e))
+          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, partitionFetchSize, false, Some(e))
       }
     }
 
@@ -608,7 +611,7 @@ class ReplicaManager(val config: KafkaConfig,
       if (messageSetSize > 0)
         minOneMessage = false
       limitBytes = math.max(0, limitBytes - messageSetSize)
-      result += (tp -> readResult)
+      result += (TopicAndPartition(tp.topic, tp.partition) -> readResult)
     }
     result
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index ba2fb92..d4c82d8 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -116,7 +116,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
   override def isThrottled(topicPartition: TopicAndPartition): Boolean = {
     val partitions = throttledPartitions.get(topicPartition.topic)
     if (partitions != null)
-      partitions.contains(topicPartition.partition) || (partitions eq AllReplicas)
+      (partitions eq AllReplicas) || partitions.contains(topicPartition.partition)
     else false
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index 9b0828f..220b6e1 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -18,12 +18,13 @@
 package kafka.utils
 
 import java.io.IOException
+
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient}
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.requests.AbstractRequest
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
-
 import org.apache.kafka.common.utils.{Time => JTime}
 
 object NetworkClientBlockingOps {
@@ -102,22 +103,19 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
    * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
    * care.
    */
-  def blockingSendAndReceive(request: ClientRequest)(implicit time: JTime): ClientResponse = {
+  def blockingSendAndReceive(request: ClientRequest, body: AbstractRequest)(implicit time: JTime): ClientResponse = {
     client.send(request, time.milliseconds())
 
     pollContinuously { responses =>
       val response = responses.find { response =>
-        response.request.request.header.correlationId == request.request.header.correlationId
+        response.requestHeader.correlationId == request.header.correlationId
       }
       response.foreach { r =>
-        if (r.wasDisconnected) {
-          val destination = request.request.destination
-          throw new IOException(s"Connection to $destination was disconnected before the response was read")
-        }
+        if (r.wasDisconnected)
+          throw new IOException(s"Connection to ${request.destination} was disconnected before the response was read")
       }
       response
     }
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 8502ae0..6eebbbf 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -38,9 +38,9 @@ import org.junit.{After, Assert, Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
-
 import org.apache.kafka.common.KafkaException
 import kafka.admin.AdminUtils
+import org.apache.kafka.common.record.MemoryRecords
 
 class AuthorizerIntegrationTest extends BaseRequestTest {
 
@@ -55,7 +55,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val correlationId = 0
   val clientId = "client-Id"
   val tp = new TopicPartition(topic, part)
-  val topicAndPartition = new TopicAndPartition(topic, part)
+  val topicAndPartition = TopicAndPartition(topic, part)
   val group = "my-group"
   val topicResource = new Resource(Topic, topic)
   val groupResource = new Resource(Group, group)
@@ -181,7 +181,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createProduceRequest = {
-    new requests.ProduceRequest(1, 5000, collection.mutable.Map(tp -> ByteBuffer.wrap("test".getBytes)).asJava)
+    new requests.ProduceRequest(1, 5000, collection.mutable.Map(tp -> MemoryRecords.readableRecords(ByteBuffer.wrap("test".getBytes))).asJava)
   }
 
   private def createFetchRequest = {
@@ -766,10 +766,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
                                             resources: Set[ResourceType],
                                             isAuthorized: Boolean,
                                             isAuthorizedTopicDescribe: Boolean,
-                                            topicExists: Boolean = true): AbstractRequestResponse = {
+                                            topicExists: Boolean = true): AbstractResponse = {
     val resp = send(request, apiKey)
-    val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse]
-    val error = Errors.forCode(RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractRequestResponse) => Short](response))
+    val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractResponse]
+    val error = Errors.forCode(RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractResponse) => Short](response))
 
     val authorizationErrorCodes = resources.flatMap { resourceType =>
       if (resourceType == Topic) {
@@ -786,13 +786,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     if (topicExists)
       if (isAuthorized)
-        assertFalse(s"${apiKey} should be allowed. Found unexpected authorization error $error", authorizationErrorCodes.contains(error))
+        assertFalse(s"$apiKey should be allowed. Found unexpected authorization error $error", authorizationErrorCodes.contains(error))
       else
-        assertTrue(s"${apiKey} should be forbidden. Found error $error but expected one of $authorizationErrorCodes", authorizationErrorCodes.contains(error))
+        assertTrue(s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrorCodes", authorizationErrorCodes.contains(error))
     else if (resources == Set(Topic))
-      assertEquals(s"${apiKey} had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
+      assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
     else
-      assertNotEquals(s"${apiKey} had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error)
+      assertNotEquals(s"$apiKey had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error)
 
     response
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index 0d86128..476a577 100644
--- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -103,7 +103,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
     var remaining = messageSetSize
     var iterations = 0
     while (remaining > 0) {
-      remaining -= messageSet.writeTo(channel, messageSetSize - remaining, remaining)
+      remaining -= messageSet.asRecords.writeTo(channel, messageSetSize - remaining, remaining).toInt
       iterations += 1
     }
 
@@ -112,7 +112,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
   }
 
   def checkWriteToWithMessageSet(messageSet: MessageSet) {
-    checkWriteWithMessageSet(messageSet, messageSet.writeTo(_, 0, messageSet.sizeInBytes))
+    checkWriteWithMessageSet(messageSet, messageSet.asRecords.writeTo(_, 0, messageSet.sizeInBytes))
   }
 
   def checkWriteWithMessageSet(set: MessageSet, write: GatheringByteChannel => Long) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index da6f260..f7b5da5 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -31,10 +31,9 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader}
 import org.apache.kafka.common.utils.SystemTime
-
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-
+import org.apache.kafka.common.record.MemoryRecords
 import org.junit.Assert._
 import org.junit._
 import org.scalatest.junit.JUnitSuite
@@ -117,7 +116,7 @@ class SocketServerTest extends JUnitSuite {
     val ack = 0: Short
 
     val emptyHeader = new RequestHeader(apiKey, clientId, correlationId)
-    val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]())
+    val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]())
 
     val byteBuffer = ByteBuffer.allocate(emptyHeader.sizeOf + emptyRequest.sizeOf)
     emptyHeader.writeTo(byteBuffer)
@@ -270,7 +269,7 @@ class SocketServerTest extends JUnitSuite {
       val ackTimeoutMs = 10000
       val ack = 0: Short
       val emptyHeader = new RequestHeader(apiKey, clientId, correlationId)
-      val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]())
+      val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]())
 
       val byteBuffer = ByteBuffer.allocate(emptyHeader.sizeOf() + emptyRequest.sizeOf())
       emptyHeader.writeTo(byteBuffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 52809b3..4205ccb 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -172,7 +172,7 @@ class CreateTopicsRequestTest extends BaseRequestTest {
   private def duplicateFirstTopic(request: CreateTopicsRequest) = {
     val struct = request.toStruct
     val topics = struct.getArray("create_topic_requests")
-    val firstTopic= topics(0).asInstanceOf[Struct]
+    val firstTopic = topics(0).asInstanceOf[Struct]
     val newTopics = firstTopic :: topics.toList
     struct.set("create_topic_requests", newTopics.toArray)
     new CreateTopicsRequest(struct)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 2ccb7b8..a886609 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -22,13 +22,13 @@ import java.net.Socket
 import java.nio.ByteBuffer
 
 import kafka.integration.KafkaServerTestHarness
-
 import kafka.network.SocketServer
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.types.Type
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.requests.{ProduceResponse, ResponseHeader, ProduceRequest}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse, ResponseHeader}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -116,7 +116,8 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
     val serializedBytes = {
       val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, 2, null, correlationId)
       val messageBytes = "message".getBytes
-      val request = new ProduceRequest(1, 10000, Map(topicPartition -> ByteBuffer.wrap(messageBytes)).asJava)
+      val records = MemoryRecords.readableRecords(ByteBuffer.wrap(messageBytes))
+      val request = new ProduceRequest(1, 10000, Map(topicPartition -> records).asJava)
       val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.sizeOf)
       byteBuffer.put(headerBytes)
       request.writeTo(byteBuffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index bb0e060..3d1b485 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -129,7 +129,7 @@ class FetchRequestTest extends BaseRequestTest {
     val size3 = logEntries(partitionData3).map(_.size).sum
     assertTrue(s"Expected $size3 to be smaller than $maxResponseBytes", size3 <= maxResponseBytes)
     assertTrue(s"Expected $size3 to be larger than $maxPartitionBytes", size3 > maxPartitionBytes)
-    assertTrue(maxPartitionBytes < MemoryRecords.readableRecords(partitionData3.recordSet).sizeInBytes)
+    assertTrue(maxPartitionBytes < partitionData3.records.sizeInBytes)
 
     // 4. Partition with message larger than the response limit at the start of the list
     val shuffledTopicPartitions4 = Seq(partitionWithLargeMessage2, partitionWithLargeMessage1) ++
@@ -146,7 +146,7 @@ class FetchRequestTest extends BaseRequestTest {
     assertTrue(partitionData4.highWatermark > 0)
     val size4 = logEntries(partitionData4).map(_.size).sum
     assertTrue(s"Expected $size4 to be larger than $maxResponseBytes", size4 > maxResponseBytes)
-    assertTrue(maxResponseBytes < MemoryRecords.readableRecords(partitionData4.recordSet).sizeInBytes)
+    assertTrue(maxResponseBytes < partitionData4.records.sizeInBytes)
   }
 
   @Test
@@ -160,12 +160,12 @@ class FetchRequestTest extends BaseRequestTest {
     val partitionData = fetchResponse.responseData.get(topicPartition)
     assertEquals(Errors.NONE.code, partitionData.errorCode)
     assertTrue(partitionData.highWatermark > 0)
-    assertEquals(maxPartitionBytes, MemoryRecords.readableRecords(partitionData.recordSet).sizeInBytes)
+    assertEquals(maxPartitionBytes, partitionData.records.sizeInBytes)
     assertEquals(0, logEntries(partitionData).map(_.size).sum)
   }
 
   private def logEntries(partitionData: FetchResponse.PartitionData): Seq[LogEntry] = {
-    val memoryRecords = MemoryRecords.readableRecords(partitionData.recordSet)
+    val memoryRecords = partitionData.records
     memoryRecords.iterator.asScala.toIndexedSeq
   }
 
@@ -181,7 +181,7 @@ class FetchRequestTest extends BaseRequestTest {
       assertEquals(Errors.NONE.code, partitionData.errorCode)
       assertTrue(partitionData.highWatermark > 0)
 
-      val memoryRecords = MemoryRecords.readableRecords(partitionData.recordSet)
+      val memoryRecords = partitionData.records
       responseBufferSize += memoryRecords.sizeInBytes
 
       val messages = memoryRecords.iterator.asScala.toIndexedSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 2e24aae..5726152 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -21,7 +21,7 @@ import org.apache.kafka.common.TopicPartition
 
 import scala.collection.JavaConverters._
 import kafka.api.LeaderAndIsr
-import org.apache.kafka.common.requests.{AbstractRequestResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, PartitionState}
+import org.apache.kafka.common.requests._
 import org.junit.Assert._
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.cluster.Broker
@@ -156,7 +156,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     }
   }
 
-  private def staleControllerEpochCallback(response: AbstractRequestResponse): Unit = {
+  private def staleControllerEpochCallback(response: AbstractResponse): Unit = {
     val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse]
     staleControllerEpochDetected = Errors.forCode(leaderAndIsrResponse.errorCode) match {
       case Errors.STALE_CONTROLLER_EPOCH => true

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 9e4c800..bd74dee 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -23,7 +23,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
-import org.apache.kafka.common.record.{CompressionType, Record}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
 import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
 import org.junit.Assert._
 import org.junit.Test
@@ -42,7 +42,7 @@ class ProduceRequestTest extends BaseRequestTest {
 
     def sendAndCheck(recordBuffer: ByteBuffer, expectedOffset: Long): ProduceResponse.PartitionResponse = {
       val topicPartition = new TopicPartition("topic", partition)
-      val partitionRecords = Map(topicPartition -> recordBuffer)
+      val partitionRecords = Map(topicPartition -> MemoryRecords.readableRecords(recordBuffer))
       val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
       assertEquals(1, produceResponse.responses.size)
       val (tp, partitionResponse) = produceResponse.responses.asScala.head
@@ -78,7 +78,7 @@ class ProduceRequestTest extends BaseRequestTest {
     // Change the lz4 checksum value so that it doesn't match the contents
     recordBuffer.array.update(40, 0)
     val topicPartition = new TopicPartition("topic", partition)
-    val partitionRecords = Map(topicPartition -> recordBuffer)
+    val partitionRecords = Map(topicPartition -> MemoryRecords.readableRecords(recordBuffer))
     val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
     assertEquals(1, produceResponse.responses.size)
     val (tp, partitionResponse) = produceResponse.responses.asScala.head

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 17e0516..e226833 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -20,20 +20,20 @@ package kafka.server
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.api._
 import kafka.cluster.Replica
 import kafka.common.TopicAndPartition
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.easymock.EasyMock
 import org.easymock.EasyMock._
 import org.junit.Assert._
 import org.junit.{After, Test}
 
-
 class ReplicaManagerQuotasTest {
   val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, new Properties()))
   val time = new MockTime
@@ -42,7 +42,8 @@ class ReplicaManagerQuotasTest {
   val message = new Message("some-data-in-a-message".getBytes())
   val topicAndPartition1 = TopicAndPartition("test-topic", 1)
   val topicAndPartition2 = TopicAndPartition("test-topic", 2)
-  val fetchInfo = Seq(topicAndPartition1 -> PartitionFetchInfo(0, 100), topicAndPartition2 -> PartitionFetchInfo(0, 100))
+  val fetchInfo = Seq(new TopicPartition(topicAndPartition1.topic, topicAndPartition1.partition) -> new PartitionData(0, 100),
+    new TopicPartition(topicAndPartition2.topic, topicAndPartition2.partition) -> new PartitionData(0, 100))
   var replicaManager: ReplicaManager = null
 
   @Test
@@ -143,7 +144,7 @@ class ReplicaManagerQuotasTest {
       fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
   }
 
-  def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], message: Message = this.message, bothReplicasInSync: Boolean = false) {
+  def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], message: Message = this.message, bothReplicasInSync: Boolean = false) {
     val zkUtils = createNiceMock(classOf[ZkUtils])
     val scheduler = createNiceMock(classOf[KafkaScheduler])
 
@@ -185,7 +186,7 @@ class ReplicaManagerQuotasTest {
       partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
       val followerReplica = new Replica(configs.last.brokerId, partition, time, 0, Some(log))
       val allReplicas = Set(leaderReplica, followerReplica)
-      allReplicas.foreach(partition.addReplicaIfNotExists(_))
+      allReplicas.foreach(partition.addReplicaIfNotExists)
       if (bothReplicasInSync) {
         partition.inSyncReplicas = allReplicas
         followerReplica.highWatermark = new LogOffsetMetadata(5)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ca8d712..243e06e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -21,7 +21,7 @@ package kafka.server
 import java.io.File
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo}
+import kafka.api.FetchResponsePartitionData
 import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
 import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
@@ -31,6 +31,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
@@ -173,7 +174,7 @@ class ReplicaManagerTest {
         fetchMinBytes = 100000,
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(0, 100000)),
+        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 100000)),
         responseCallback = fetchCallback)
 
       // Make this replica the follower
@@ -244,7 +245,7 @@ class ReplicaManagerTest {
         fetchMinBytes = 0,
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
+        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 100000)),
         responseCallback = fetchCallback)
         
       
@@ -260,7 +261,7 @@ class ReplicaManagerTest {
         fetchMinBytes = 0,
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
+        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 100000)),
         responseCallback = fetchCallback)
           
         assertTrue(fetchCallbackFired)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index cbd751b..340b05f 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -19,19 +19,18 @@ package kafka.server
 import kafka.api._
 import kafka.utils._
 import kafka.cluster.Replica
-import kafka.common.TopicAndPartition
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.server.QuotaFactory.UnboundedQuota
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
-
-import org.junit.{Test, After, Before}
-
-import java.util.{Properties}
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.junit.{After, Before, Test}
+import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
-import collection.JavaConversions._
 
+import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
 import org.easymock.EasyMock
 import org.junit.Assert._
 
@@ -63,7 +62,7 @@ class SimpleFetchTest {
   val partitionId = 0
   val topicAndPartition = TopicAndPartition(topic, partitionId)
 
-  val fetchInfo = Seq(topicAndPartition -> PartitionFetchInfo(0, fetchSize))
+  val fetchInfo = Seq(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition) -> new PartitionData(0, fetchSize))
 
   var replicaManager: ReplicaManager = null
 
@@ -117,7 +116,7 @@ class SimpleFetchTest {
 
     // add both of them to ISR
     val allReplicas = List(leaderReplica, followerReplica)
-    allReplicas.foreach(partition.addReplicaIfNotExists(_))
+    allReplicas.foreach(partition.addReplicaIfNotExists)
     partition.inSyncReplicas = allReplicas.toSet
   }
 


Mime
View raw message