kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [2/2] git commit: KAFKA-1583; Encapsulate replicated log implementation details into ReplicaManager and refactor KafkaApis; reviewed by Joel Koshy and Jun Rao
Date Thu, 30 Oct 2014 01:57:06 GMT
KAFKA-1583; Encapsulate replicated log implementation details into
ReplicaManager and refactor KafkaApis; reviewed by Joel Koshy and Jun
Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89831204
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89831204
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89831204

Branch: refs/heads/trunk
Commit: 89831204c092f3a417bf41945925a2e9a0ec828e
Parents: 20f5b01
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Wed Oct 29 17:11:03 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Oct 29 18:56:23 2014 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/FetchRequest.scala |   1 -
 .../main/scala/kafka/api/FetchResponse.scala    |  12 +-
 .../scala/kafka/api/OffsetCommitRequest.scala   |  24 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   1 -
 .../main/scala/kafka/api/ProducerResponse.scala |   3 +-
 .../main/scala/kafka/cluster/Partition.scala    |  50 ++-
 .../main/scala/kafka/common/ErrorMapping.scala  |   2 +
 core/src/main/scala/kafka/log/Log.scala         |  26 +-
 .../kafka/network/BoundedByteBufferSend.scala   |   4 +-
 .../main/scala/kafka/server/DelayedFetch.scala  | 125 ++++---
 .../scala/kafka/server/DelayedProduce.scala     | 135 +++----
 .../kafka/server/FetchRequestPurgatory.scala    |  69 ----
 .../src/main/scala/kafka/server/KafkaApis.scala | 325 ++++++----------
 .../main/scala/kafka/server/OffsetManager.scala |  93 ++++-
 .../kafka/server/ProducerRequestPurgatory.scala |  69 ----
 .../scala/kafka/server/ReplicaManager.scala     | 370 +++++++++++++------
 .../scala/kafka/server/RequestPurgatory.scala   | 336 ++++++++---------
 .../main/scala/kafka/utils/DelayedItem.scala    |   8 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |  18 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |   4 +-
 .../server/HighwatermarkPersistenceTest.scala   |   7 +
 .../unit/kafka/server/ISRExpirationTest.scala   |  17 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |   6 +
 .../kafka/server/RequestPurgatoryTest.scala     | 103 +++---
 .../unit/kafka/server/ServerShutdownTest.scala  |   2 +-
 .../unit/kafka/server/SimpleFetchTest.scala     | 270 ++++----------
 26 files changed, 1006 insertions(+), 1074 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 59c0915..b038c15 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -30,7 +30,6 @@ import scala.collection.immutable.Map
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
 
-
 object FetchRequest {
   val CurrentVersion = 0.shortValue
   val DefaultMaxWait = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 8d085a1..75aaf57 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -25,6 +25,8 @@ import kafka.message.{MessageSet, ByteBufferMessageSet}
 import kafka.network.{MultiSend, Send}
 import kafka.api.ApiUtils._
 
+import scala.collection._
+
 object FetchResponsePartitionData {
   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
     val error = buffer.getShort
@@ -150,10 +152,8 @@ object FetchResponse {
   }
 }
 
-
-case class FetchResponse(correlationId: Int,
-                         data: Map[TopicAndPartition, FetchResponsePartitionData])
-    extends RequestOrResponse() {
+case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData])
+  extends RequestOrResponse() {
 
   /**
    * Partitions the data into a map of maps (one for each topic).
@@ -171,8 +171,8 @@ case class FetchResponse(correlationId: Int,
 
   /*
    * FetchResponse uses [sendfile](http://man7.org/linux/man-pages/man2/sendfile.2.html)
-   * api for data transfer, so `writeTo` aren't actually being used.
-   * It is implemented as an empty function to comform to `RequestOrResponse.writeTo`
+   * api for data transfer through the FetchResponseSend, so `writeTo` aren't actually being used.
+   * It is implemented as an empty function to conform to `RequestOrResponse.writeTo`
    * abstract method signature.
    */
   def writeTo(buffer: ByteBuffer): Unit = throw new UnsupportedOperationException

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 861a6cf..050615c 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -78,28 +78,12 @@ case class OffsetCommitRequest(groupId: String,
                                groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID,
                                consumerId: String =  org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID)
     extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
+
   assert(versionId == 0 || versionId == 1,
          "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.")
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 
-  def filterLargeMetadata(maxMetadataSize: Int) =
-    requestInfo.filter(info => info._2.metadata == null || info._2.metadata.length <= maxMetadataSize)
-
-  def responseFor(errorCode: Short, offsetMetadataMaxSize: Int) = {
-    val commitStatus = requestInfo.map {info =>
-      (info._1, if (info._2.metadata != null && info._2.metadata.length > offsetMetadataMaxSize)
-                  ErrorMapping.OffsetMetadataTooLargeCode
-                else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode)
-                  ErrorMapping.ConsumerCoordinatorNotAvailableCode
-                else if (errorCode == ErrorMapping.NotLeaderForPartitionCode)
-                  ErrorMapping.NotCoordinatorForConsumerCode
-                else
-                  errorCode)
-    }.toMap
-    OffsetCommitResponse(commitStatus, correlationId)
-  }
-
   def writeTo(buffer: ByteBuffer) {
     // Write envelope
     buffer.putShort(versionId)
@@ -150,8 +134,10 @@ case class OffsetCommitRequest(groupId: String,
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
-    val errorResponse = responseFor(errorCode, Int.MaxValue)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    val commitStatus = requestInfo.mapValues(_ => errorCode)
+    val commitResponse = OffsetCommitResponse(commitStatus, correlationId)
+
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(commitResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index b2366e7..570b2da 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -153,7 +153,6 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
     producerRequest.toString()
   }
 
-
   def emptyData(){
     data.clear()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index a286272..5d1fac4 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -43,8 +43,7 @@ object ProducerResponse {
 
 case class ProducerResponseStatus(var error: Short, offset: Long)
 
-case class ProducerResponse(correlationId: Int,
-                            status: Map[TopicAndPartition, ProducerResponseStatus])
+case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus])
     extends RequestOrResponse() {
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e88ecf2..1be5700 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -230,7 +230,33 @@ class Partition(val topic: String,
     }
   }
 
-  def updateLeaderHWAndMaybeExpandIsr(replicaId: Int) {
+  /**
+   * Update the log end offset of a certain replica of this partition
+   */
+  def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) = {
+    getReplica(replicaId) match {
+      case Some(replica) =>
+        replica.logEndOffset = offset
+
+        // check if we need to expand ISR to include this replica
+        // if it is not in the ISR yet
+        maybeExpandIsr(replicaId)
+
+        debug("Recorded replica %d log end offset (LEO) position %d for partition %s."
+          .format(replicaId, offset.messageOffset, TopicAndPartition(topic, partitionId)))
+      case None =>
+        throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" +
+          " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId,
+          offset.messageOffset, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId))
+    }
+  }
+
+  /**
+   * Check and maybe expand the ISR of the partition.
+   *
+   * This function can be triggered when a replica's LEO has incremented
+   */
+  def maybeExpandIsr(replicaId: Int) {
     inWriteLock(leaderIsrUpdateLock) {
       // check if this replica needs to be added to the ISR
       leaderReplicaIfLocal() match {
@@ -252,7 +278,10 @@ class Partition(val topic: String,
             updateIsr(newInSyncReplicas)
             replicaManager.isrExpandRate.mark()
           }
+          // check if the HW of the partition can now be incremented
+          // since the replica maybe now be in the ISR and its LEO has just incremented
           maybeIncrementLeaderHW(leaderReplica)
+
         case None => // nothing to do if no longer leader
       }
     }
@@ -272,6 +301,7 @@ class Partition(val topic: String,
         val minIsr = leaderReplica.log.get.config.minInSyncReplicas
 
         trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
+
         if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
           /*
           * requiredAcks < 0 means acknowledge after all replicas in ISR
@@ -298,8 +328,14 @@ class Partition(val topic: String,
   }
 
   /**
-   * There is no need to acquire the leaderIsrUpdate lock here since all callers of this private API acquire that lock
-   * @param leaderReplica
+   * Check and maybe increment the high watermark of the partition;
+   * this function can be triggered when
+   *
+   * 1. Partition ISR changed
+   * 2. Any replica's LEO changed
+   *
+   * Note There is no need to acquire the leaderIsrUpdate lock here
+   * since all callers of this private API acquire that lock
    */
   private def maybeIncrementLeaderHW(leaderReplica: Replica) {
     val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
@@ -310,8 +346,8 @@ class Partition(val topic: String,
       debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
       // some delayed requests may be unblocked after HW changed
       val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId)
-      replicaManager.unblockDelayedFetchRequests(requestKey)
-      replicaManager.unblockDelayedProduceRequests(requestKey)
+      replicaManager.tryCompleteDelayedFetch(requestKey)
+      replicaManager.tryCompleteDelayedProduce(requestKey)
     } else {
       debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
         .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
@@ -362,7 +398,7 @@ class Partition(val topic: String,
     stuckReplicas ++ slowReplicas
   }
 
-  def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = {
+  def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
     inReadLock(leaderIsrUpdateLock) {
       val leaderReplicaOpt = leaderReplicaIfLocal()
       leaderReplicaOpt match {
@@ -379,7 +415,7 @@ class Partition(val topic: String,
 
           val info = log.append(messages, assignOffsets = true)
           // probably unblock some follower fetch requests since log end offset has been updated
-          replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId))
+          replicaManager.tryCompleteDelayedFetch(new TopicPartitionRequestKey(this.topic, this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1
           maybeIncrementLeaderHW(leaderReplica)
           info

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 880ab4a..eedc2f5 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -84,4 +84,6 @@ object ErrorMapping {
       throw codeToException(code).newInstance()
 
   def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance()
+
+  def exceptionNameFor(code: Short) : String = codeToException(code).getName()
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 157d673..37b4a85 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -31,6 +31,21 @@ import scala.collection.JavaConversions
 
 import com.yammer.metrics.core.Gauge
 
+object LogAppendInfo {
+  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, -1, -1, false)
+}
+
+/**
+ * Struct to hold various quantities we compute about each message set before appending to the log
+ * @param firstOffset The first offset in the message set
+ * @param lastOffset The last offset in the message set
+ * @param shallowCount The number of shallow messages
+ * @param validBytes The number of valid bytes
+ * @param codec The codec used in the message set
+ * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
+ */
+case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
+
 
 /**
  * An append-only log for storing messages.
@@ -311,17 +326,6 @@ class Log(val dir: File,
   }
   
   /**
-   * Struct to hold various quantities we compute about each message set before appending to the log
-   * @param firstOffset The first offset in the message set
-   * @param lastOffset The last offset in the message set
-   * @param shallowCount The number of shallow messages
-   * @param validBytes The number of valid bytes
-   * @param codec The codec used in the message set
-   * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
-   */
-  case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
-
-  /**
    * Validate the following:
    * <ol>
    * <li> each message matches its CRC

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
index a624359..55ecac2 100644
--- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
+++ b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
@@ -25,7 +25,7 @@ import kafka.api.RequestOrResponse
 @nonthreadsafe
 private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send {
   
-  private var sizeBuffer = ByteBuffer.allocate(4)
+  private val sizeBuffer = ByteBuffer.allocate(4)
 
   // Avoid possibility of overflow for 2GB-4 byte buffer
   if(buffer.remaining > Int.MaxValue - sizeBuffer.limit)
@@ -53,7 +53,7 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send
   
   def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete()
-    var written = channel.write(Array(sizeBuffer, buffer))
+    val written = channel.write(Array(sizeBuffer, buffer))
     // if we are done, mark it off
     if(!buffer.hasRemaining)
       complete = true    

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index e0f14e2..1ccbb4b 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -17,75 +17,110 @@
 
 package kafka.server
 
-import kafka.network.RequestChannel
-import kafka.api.{FetchResponse, FetchRequest}
-import kafka.common.{UnknownTopicOrPartitionException, NotLeaderForPartitionException, TopicAndPartition}
+import kafka.api.FetchResponsePartitionData
+import kafka.api.PartitionFetchInfo
+import kafka.common.UnknownTopicOrPartitionException
+import kafka.common.NotLeaderForPartitionException
+import kafka.common.TopicAndPartition
 
-import scala.collection.immutable.Map
-import scala.collection.Seq
+import scala.collection._
+
+case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionFetchInfo) {
+
+  override def toString = "[startOffsetMetadata: " + startOffsetMetadata + ", " +
+                          "fetchInfo: " + fetchInfo + "]"
+}
 
 /**
- * A delayed fetch request, which is satisfied (or more
- * accurately, unblocked) -- if:
- * Case A: This broker is no longer the leader for some partitions it tries to fetch
- *   - should return whatever data is available for the rest partitions.
- * Case B: This broker is does not know of some partitions it tries to fetch
- *   - should return whatever data is available for the rest partitions.
- * Case C: The fetch offset locates not on the last segment of the log
- *   - should return all the data on that segment.
- * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
- *   - should return whatever data is available.
+ * The fetch metadata maintained by the delayed produce request
  */
+case class FetchMetadata(fetchMinBytes: Int,
+                         fetchOnlyLeader: Boolean,
+                         fetchOnlyCommitted: Boolean,
+                         fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) {
+
+  override def toString = "[minBytes: " + fetchMinBytes + ", " +
+                          "onlyLeader:" + fetchOnlyLeader + ", "
+                          "onlyCommitted: " + fetchOnlyCommitted + ", "
+                          "partitionStatus: " + fetchPartitionStatus + "]"
+}
 
-class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey],
-                   override val request: RequestChannel.Request,
-                   override val delayMs: Long,
-                   val fetch: FetchRequest,
-                   private val partitionFetchOffsets: Map[TopicAndPartition, LogOffsetMetadata])
-  extends DelayedRequest(keys, request, delayMs) {
+/**
+ * A delayed fetch request that can be created by the replica manager and watched
+ * in the fetch request purgatory
+ */
+class DelayedFetch(delayMs: Long,
+                   fetchMetadata: FetchMetadata,
+                   replicaManager: ReplicaManager,
+                   responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit)
+  extends DelayedRequest(delayMs) {
 
-  def isSatisfied(replicaManager: ReplicaManager) : Boolean = {
+  /**
+   * The request can be completed if:
+   *
+   * Case A: This broker is no longer the leader for some partitions it tries to fetch
+   * Case B: This broker does not know of some partitions it tries to fetch
+   * Case C: The fetch offset locates not on the last segment of the log
+   * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
+   *
+   * Upon completion, should return whatever data is available for each valid partition
+   */
+  override def tryComplete() : Boolean = {
     var accumulatedSize = 0
-    val fromFollower = fetch.isFromFollower
-    partitionFetchOffsets.foreach {
-      case (topicAndPartition, fetchOffset) =>
+    fetchMetadata.fetchPartitionStatus.foreach {
+      case (topicAndPartition, fetchStatus) =>
+        val fetchOffset = fetchStatus.startOffsetMetadata
         try {
           if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
             val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
             val endOffset =
-              if (fromFollower)
-                replica.logEndOffset
-              else
+              if (fetchMetadata.fetchOnlyCommitted)
                 replica.highWatermark
+              else
+                replica.logEndOffset
 
             if (endOffset.offsetOnOlderSegment(fetchOffset)) {
-              // Case C, this can happen when the new follower replica fetching on a truncated leader
-              debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition))
-              return true
+              // Case C, this can happen when the new fetch operation is on a truncated leader
+              debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition))
+              return forceComplete()
             } else if (fetchOffset.offsetOnOlderSegment(endOffset)) {
-              // Case C, this can happen when the folloer replica is lagging too much
-              debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch))
-              return true
+              // Case C, this can happen when the fetch operation is falling behind the current segment
+              // or the partition has just rolled a new segment
+              debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
+              return forceComplete()
             } else if (fetchOffset.precedes(endOffset)) {
-              accumulatedSize += endOffset.positionDiff(fetchOffset)
+              // we need take the partition fetch size as upper bound when accumulating the bytes
+              accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
             }
           }
         } catch {
-          case utpe: UnknownTopicOrPartitionException => // Case A
-            debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetch))
-            return true
-          case nle: NotLeaderForPartitionException =>  // Case B
-            debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetch))
-            return true
+          case utpe: UnknownTopicOrPartitionException => // Case B
+            debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
+            return forceComplete()
+          case nle: NotLeaderForPartitionException =>  // Case A
+            debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
+            return forceComplete()
         }
     }
 
     // Case D
-    accumulatedSize >= fetch.minBytes
+    if (accumulatedSize >= fetchMetadata.fetchMinBytes)
+      forceComplete()
+    else
+      false
   }
 
-  def respond(replicaManager: ReplicaManager): FetchResponse = {
-    val topicData = replicaManager.readMessageSets(fetch)
-    FetchResponse(fetch.correlationId, topicData.mapValues(_.data))
+  /**
+   * Upon completion, read whatever data is available and pass to the complete callback
+   */
+  override def onComplete() {
+    val logReadResults = replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader,
+      fetchMetadata.fetchOnlyCommitted,
+      fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo))
+
+    val fetchPartitionData = logReadResults.mapValues(result =>
+      FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
+
+    responseCallback(fetchPartitionData)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 9481508..8049e07 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -17,99 +17,106 @@
 
 package kafka.server
 
-import kafka.api._
+
+import kafka.api.ProducerResponseStatus
 import kafka.common.ErrorMapping
 import kafka.common.TopicAndPartition
-import kafka.utils.Logging
-import kafka.network.RequestChannel
 
 import scala.Some
-import scala.collection.immutable.Map
-import scala.collection.Seq
-
-/** A delayed produce request, which is satisfied (or more
-  * accurately, unblocked) -- if for every partition it produce to:
-  * Case A: This broker is not the leader: unblock - should return error.
-  * Case B: This broker is the leader:
-  *   B.1 - If there was a localError (when writing to the local log): unblock - should return error
-  *   B.2 - else, at least requiredAcks replicas should be caught up to this request.
-  */
+import scala.collection._
 
-class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey],
-                     override val request: RequestChannel.Request,
-                     override val delayMs: Long,
-                     val produce: ProducerRequest,
-                     val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
-                     val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
-  extends DelayedRequest(keys, request, delayMs) with Logging {
+case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) {
+  @volatile var acksPending = false
 
-  // first update the acks pending variable according to the error code
-  partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
-    if (delayedStatus.responseStatus.error == ErrorMapping.NoError) {
-      // Timeout error state will be cleared when required acks are received
-      delayedStatus.acksPending = true
-      delayedStatus.responseStatus.error = ErrorMapping.RequestTimedOutCode
-    } else {
-      delayedStatus.acksPending = false
-    }
+  override def toString = "[acksPending: %b, error: %d, startOffset: %d, requiredOffset: %d]"
+    .format(acksPending, responseStatus.error, responseStatus.offset, requiredOffset)
+}
 
-    trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
-  }
+/**
+ * The produce metadata maintained by the delayed produce request
+ */
+case class ProduceMetadata(produceRequiredAcks: Short,
+                           produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) {
 
-  def respond(offsetManager: OffsetManager): RequestOrResponse = {
-    val responseStatus = partitionStatus.mapValues(status => status.responseStatus)
+  override def toString = "[requiredAcks: %d, partitionStatus: %s]"
+    .format(produceRequiredAcks, produceStatus)
+}
 
-    val errorCode = responseStatus.find { case (_, status) =>
-      status.error != ErrorMapping.NoError
-    }.map(_._2.error).getOrElse(ErrorMapping.NoError)
+/**
+ * A delayed produce request that can be created by the replica manager and watched
+ * in the produce request purgatory
+ */
+class DelayedProduce(delayMs: Long,
+                     produceMetadata: ProduceMetadata,
+                     replicaManager: ReplicaManager,
+                     responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
+  extends DelayedRequest(delayMs) {
 
-    if (errorCode == ErrorMapping.NoError) {
-      offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
+  // first update the acks pending variable according to the error code
+  produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
+    if (status.responseStatus.error == ErrorMapping.NoError) {
+      // Timeout error state will be cleared when required acks are received
+      status.acksPending = true
+      status.responseStatus.error = ErrorMapping.RequestTimedOutCode
+    } else {
+      status.acksPending = false
     }
 
-    val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize))
-      .getOrElse(ProducerResponse(produce.correlationId, responseStatus))
-
-    response
+    trace("Initial partition status for %s is %s".format(topicAndPartition, status))
   }
 
-  def isSatisfied(replicaManager: ReplicaManager) = {
+  /**
+   * The delayed produce request can be completed if every partition
+   * it produces to is satisfied by one of the following:
+   *
+   * Case A: This broker is no longer the leader: set an error in response
+   * Case B: This broker is the leader:
+   *   B.1 - If there was a local error thrown while checking if at least requiredAcks
+   *         replicas have caught up to this request: set an error in response
+   *   B.2 - Otherwise, set the response with no error.
+   */
+  override def tryComplete(): Boolean = {
     // check for each partition if it still has pending acks
-    partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) =>
-      trace("Checking producer request satisfaction for %s, acksPending = %b"
-        .format(topicAndPartition, fetchPartitionStatus.acksPending))
+    produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
+      trace("Checking produce satisfaction for %s, current status %s"
+        .format(topicAndPartition, status))
       // skip those partitions that have already been satisfied
-      if (fetchPartitionStatus.acksPending) {
+      if (status.acksPending) {
         val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
         val (hasEnough, errorCode) = partitionOpt match {
           case Some(partition) =>
             partition.checkEnoughReplicasReachOffset(
-              fetchPartitionStatus.requiredOffset,
-              produce.requiredAcks)
+              status.requiredOffset,
+              produceMetadata.produceRequiredAcks)
           case None =>
+            // Case A
             (false, ErrorMapping.UnknownTopicOrPartitionCode)
         }
         if (errorCode != ErrorMapping.NoError) {
-          fetchPartitionStatus.acksPending = false
-          fetchPartitionStatus.responseStatus.error = errorCode
+          // Case B.1
+          status.acksPending = false
+          status.responseStatus.error = errorCode
         } else if (hasEnough) {
-          fetchPartitionStatus.acksPending = false
-          fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError
+          // Case B.2
+          status.acksPending = false
+          status.responseStatus.error = ErrorMapping.NoError
         }
       }
     }
 
-    // unblocked if there are no partitions with pending acks
-    val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
-    satisfied
+    // check if each partition has satisfied at lease one of case A and case B
+    if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
+      forceComplete()
+    else
+      false
   }
-}
-
-case class DelayedProduceResponseStatus(val requiredOffset: Long,
-                                        val responseStatus: ProducerResponseStatus) {
-  @volatile var acksPending = false
 
-  override def toString =
-    "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
-      acksPending, responseStatus.error, responseStatus.offset, requiredOffset)
+  /**
+   * Upon completion, return the current response status along with the error code per partition
+   */
+  override def onComplete() {
+    val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
+    responseCallback(responseStatus)
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
deleted file mode 100644
index ed13188..0000000
--- a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.metrics.KafkaMetricsGroup
-import kafka.network.RequestChannel
-import kafka.api.FetchResponseSend
-
-import java.util.concurrent.TimeUnit
-
-/**
- * The purgatory holding delayed fetch requests
- */
-class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel)
-  extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) {
-  this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
-
-  private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
-    private val metricPrefix = if (forFollower) "Follower" else "Consumer"
-
-    val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
-  }
-
-  private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
-  private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
-
-  private def recordDelayedFetchExpired(forFollower: Boolean) {
-    val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
-    else aggregateNonFollowerFetchRequestMetrics
-
-    metrics.expiredRequestMeter.mark()
-  }
-
-  /**
-   * Check if a specified delayed fetch request is satisfied
-   */
-  def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager)
-
-  /**
-   * When a delayed fetch request expires just answer it with whatever data is present
-   */
-  def expire(delayedFetch: DelayedFetch) {
-    debug("Expiring fetch request %s.".format(delayedFetch.fetch))
-    val fromFollower = delayedFetch.fetch.isFromFollower
-    recordDelayedFetchExpired(fromFollower)
-    respond(delayedFetch)
-  }
-
-  // TODO: purgatory should not be responsible for sending back the responses
-  def respond(delayedFetch: DelayedFetch) {
-    val response = delayedFetch.respond(replicaManager)
-    requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response)))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 85498b4..968b0c4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -20,7 +20,6 @@ package kafka.server
 import kafka.api._
 import kafka.common._
 import kafka.log._
-import kafka.message._
 import kafka.network._
 import kafka.admin.AdminUtils
 import kafka.network.RequestChannel.Response
@@ -42,12 +41,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val config: KafkaConfig,
                 val controller: KafkaController) extends Logging {
 
-  val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager, offsetManager, requestChannel)
-  val fetchRequestPurgatory = new FetchRequestPurgatory(replicaManager, requestChannel)
-  // TODO: the following line will be removed in 0.9
-  replicaManager.initWithRequestPurgatory(producerRequestPurgatory, fetchRequestPurgatory)
-  var metadataCache = new MetadataCache
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
+  val metadataCache = new MetadataCache
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -56,7 +51,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     try{
       trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
       request.requestId match {
-        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
+        case RequestKeys.ProduceKey => handleProducerRequest(request)
         case RequestKeys.FetchKey => handleFetchRequest(request)
         case RequestKeys.OffsetsKey => handleOffsetRequest(request)
         case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
@@ -64,7 +59,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
         case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
         case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
-        case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request)
+        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
         case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
@@ -123,179 +118,87 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
   }
 
-  private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = {
-    val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map {
-      case (topicAndPartition, offset) =>
-        new Message(
-          bytes = OffsetManager.offsetCommitValue(offset),
-          key = OffsetManager.offsetCommitKey(offsetCommitRequest.groupId, topicAndPartition.topic, topicAndPartition.partition)
-        )
-    }.toSeq
-
-    val producerData = mutable.Map(
-      TopicAndPartition(OffsetManager.OffsetsTopicName, offsetManager.partitionFor(offsetCommitRequest.groupId)) ->
-        new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, msgs:_*)
-    )
-
-    val request = ProducerRequest(
-      correlationId = offsetCommitRequest.correlationId,
-      clientId = offsetCommitRequest.clientId,
-      requiredAcks = config.offsetCommitRequiredAcks,
-      ackTimeoutMs = config.offsetCommitTimeoutMs,
-      data = producerData)
-    trace("Created producer request %s for offset commit request %s.".format(request, offsetCommitRequest))
-    request
-  }
 
   /**
-   * Handle a produce request or offset commit request (which is really a specialized producer request)
+   * Handle an offset commit request
    */
-  def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
-    val (produceRequest, offsetCommitRequestOpt) =
-      if (request.requestId == RequestKeys.OffsetCommitKey) {
-        val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
-        (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
-      } else {
-        (request.requestObj.asInstanceOf[ProducerRequest], None)
-      }
-
-    val sTime = SystemTime.milliseconds
-    val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)
-    debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
-
-    val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
-
-    val numPartitionsInError = localProduceResults.count(_.error.isDefined)
-    if(produceRequest.requiredAcks == 0) {
-      // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
-      // no response is expected by the producer the handler will send a close connection response to the socket server
-      // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata
-      if (numPartitionsInError != 0) {
-        info(("Send the close connection response due to error handling produce request " +
-          "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
-          .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
-        requestChannel.closeConnection(request.processor, request)
-      } else {
-
-        if (firstErrorCode == ErrorMapping.NoError)
-          offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))
-
-        if (offsetCommitRequestOpt.isDefined) {
-          val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize)
-          requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
-        } else
-          requestChannel.noOperation(request.processor, request)
-      }
-    } else if (produceRequest.requiredAcks == 1 ||
-        produceRequest.numPartitions <= 0 ||
-        numPartitionsInError == produceRequest.numPartitions) {
-
-      if (firstErrorCode == ErrorMapping.NoError) {
-        offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
+  def handleOffsetCommitRequest(request: RequestChannel.Request) {
+    val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+
+    // the callback for sending the response
+    def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
+      commitStatus.foreach { case (topicAndPartition, errorCode) =>
+        // we only print warnings for known errors here; only replica manager could see an unknown
+        // exception while trying to write the offset message to the local log, and it will log
+        // an error message and write the error code in this case; hence it can be ignored here
+        if (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.UnknownCode) {
+          debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s"
+            .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId,
+            topicAndPartition, ErrorMapping.exceptionNameFor(errorCode)))
+        }
       }
 
-      val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
-      val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
-                                           .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
-
+      val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId)
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
-    } else {
-      // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val producerRequestKeys = produceRequest.data.keys.map(
-        topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq
-      val statuses = localProduceResults.map(r =>
-        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
-      val delayedRequest =  new DelayedProduce(
-        producerRequestKeys,
-        request,
-        produceRequest.ackTimeoutMs.toLong,
-        produceRequest,
-        statuses,
-        offsetCommitRequestOpt)
-
-      // add the produce request for watch if it's not satisfied, otherwise send the response back
-      val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
-      if (satisfiedByMe)
-        producerRequestPurgatory.respond(delayedRequest)
     }
 
-    // we do not need the data anymore
-    produceRequest.emptyData()
-  }
-
-  case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) {
-    def this(key: TopicAndPartition, throwable: Throwable) = 
-      this(key, -1L, -1L, Some(throwable))
-    
-    def errorCode = error match {
-      case None => ErrorMapping.NoError
-      case Some(error) => ErrorMapping.codeFor(error.getClass.asInstanceOf[Class[Throwable]])
-    }
+    // call offset manager to store offsets
+    offsetManager.storeOffsets(
+      offsetCommitRequest.groupId,
+      offsetCommitRequest.consumerId,
+      offsetCommitRequest.groupGenerationId,
+      offsetCommitRequest.requestInfo,
+      sendResponseCallback)
   }
 
   /**
-   * Helper method for handling a parsed producer request
+   * Handle a produce request
    */
-  private def appendToLocalLog(producerRequest: ProducerRequest, isOffsetCommit: Boolean): Iterable[ProduceResult] = {
-    val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data
-    trace("Append [%s] to local log ".format(partitionAndData.toString))
-    partitionAndData.map {case (topicAndPartition, messages) =>
-      try {
-        if (Topic.InternalTopics.contains(topicAndPartition.topic) &&
-            !(isOffsetCommit && topicAndPartition.topic == OffsetManager.OffsetsTopicName)) {
-          throw new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic))
-        }
-        val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
-        val info = partitionOpt match {
-          case Some(partition) =>
-            partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks)
-          case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
-            .format(topicAndPartition, brokerId))
+  def handleProducerRequest(request: RequestChannel.Request) {
+    val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
+
+    // the callback for sending the response
+    def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
+      var errorInResponse = false
+      responseStatus.foreach { case (topicAndPartition, status) =>
+        // we only print warnings for known errors here; if it is unknown, it will cause
+        // an error message in the replica manager
+        if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) {
+          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s"
+            .format(produceRequest.correlationId, produceRequest.clientId,
+            topicAndPartition, ErrorMapping.exceptionNameFor(status.error)))
+          errorInResponse = true
         }
+      }
 
-        val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1)
+      if (produceRequest.requiredAcks == 0) {
+        // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
+        // the request, since no response is expected by the producer, the server will close socket server so that
+        // the producer client will know that some error has happened and will refresh its metadata
+        if (errorInResponse) {
+          info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0"
+            .format(produceRequest.correlationId, produceRequest.clientId))
+          requestChannel.closeConnection(request.processor, request)
+        } else {
+          requestChannel.noOperation(request.processor, request)
+        }
+      } else {
+        val response = ProducerResponse(produceRequest.correlationId, responseStatus)
+        requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+      }
+    }
 
-        // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
-        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
-        BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
-        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
-        BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
+    // call the replica manager to append messages to the replicas
+    replicaManager.appendMessages(
+      produceRequest.ackTimeoutMs.toLong,
+      produceRequest.requiredAcks,
+      produceRequest.data,
+      sendResponseCallback)
 
-        trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
-              .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
-        ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset)
-      } catch {
-        // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
-        // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request
-        // for a partition it is the leader for
-        case e: KafkaStorageException =>
-          fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
-          Runtime.getRuntime.halt(1)
-          null
-        case ite: InvalidTopicException =>
-          warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
-            producerRequest.correlationId, producerRequest.clientId, topicAndPartition, ite.getMessage))
-          new ProduceResult(topicAndPartition, ite)
-        case utpe: UnknownTopicOrPartitionException =>
-          warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
-               producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage))
-          new ProduceResult(topicAndPartition, utpe)
-        case nle: NotLeaderForPartitionException =>
-          warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
-               producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage))
-          new ProduceResult(topicAndPartition, nle)
-        case nere: NotEnoughReplicasException =>
-          warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
-            producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nere.getMessage))
-          new ProduceResult(topicAndPartition, nere)
-        case e: Throwable =>
-          BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
-          BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
-          error("Error processing ProducerRequest with correlation id %d from client %s on partition %s"
-            .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition), e)
-          new ProduceResult(topicAndPartition, e)
-       }
-    }
+    // if the request is put into the purgatory, it will have a held reference
+    // and hence cannot be garbage collected; hence we clear its data here in
+    // order to let GC re-claim its memory since it is already appended to log
+    produceRequest.emptyData()
   }
 
   /**
@@ -303,59 +206,38 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
-    val dataRead = replicaManager.readMessageSets(fetchRequest)
-
-    // if the fetch request comes from the follower,
-    // update its corresponding log end offset
-    if(fetchRequest.isFromFollower)
-      recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))
-
-    // check if this fetch request can be satisfied right away
-    val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum
-    val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) =>
-      errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError))
-    // send the data immediately if 1) fetch request does not want to wait
-    //                              2) fetch request does not require any data
-    //                              3) has enough data to respond
-    //                              4) some error happens while reading data
-    if(fetchRequest.maxWait <= 0 ||
-       fetchRequest.numPartitions <= 0 ||
-       bytesReadable >= fetchRequest.minBytes ||
-       errorReadingData) {
-      debug("Returning fetch response %s for fetch request with correlation id %d to client %s"
-        .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
-      val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))
-      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
-    } else {
-      debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId,
-        fetchRequest.clientId))
-      // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_))
-      val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest,
-        dataRead.mapValues(_.offset))
-
-      // add the fetch request for watch if it's not satisfied, otherwise send the response back
-      val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch)
-      if (satisfiedByMe)
-        fetchRequestPurgatory.respond(delayedFetch)
-    }
-  }
 
-  private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) {
-    debug("Record follower log end offsets: %s ".format(offsets))
-    offsets.foreach {
-      case (topicAndPartition, offset) =>
-        replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic,
-          topicAndPartition.partition, replicaId, offset)
+    // the callback for sending the response
+    def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
+      responsePartitionData.foreach { case (topicAndPartition, data) =>
+        // we only print warnings for known errors here; if it is unknown, it will cause
+        // an error message in the replica manager already and hence can be ignored here
+        if (data.error != ErrorMapping.NoError && data.error != ErrorMapping.UnknownCode) {
+          debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s"
+            .format(fetchRequest.correlationId, fetchRequest.clientId,
+            topicAndPartition, ErrorMapping.exceptionNameFor(data.error)))
+        }
+
+        // record the bytes out metrics only when the response is being sent
+        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
+        BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
+      }
 
-        // for producer requests with ack > 1, we need to check
-        // if they can be unblocked after some follower's log end offsets have moved
-        replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition))
+      val response = FetchResponse(fetchRequest.correlationId, responsePartitionData)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     }
+
+    // call the replica manager to fetch messages from the local replica
+    replicaManager.fetchMessages(
+      fetchRequest.maxWait.toLong,
+      fetchRequest.replicaId,
+      fetchRequest.minBytes,
+      fetchRequest.requestInfo,
+      sendResponseCallback)
   }
 
   /**
-   * Service the offset request API 
+   * Handle an offset request
    */
   def handleOffsetRequest(request: RequestChannel.Request) {
     val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
@@ -387,15 +269,15 @@ class KafkaApis(val requestChannel: RequestChannel,
         // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
         // are typically transient and there is no value in logging the entire stack trace for the same
         case utpe: UnknownTopicOrPartitionException =>
-          warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
+          debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
                offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition, utpe.getMessage))
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) )
         case nle: NotLeaderForPartitionException =>
-          warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
+          debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
                offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage))
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
         case e: Throwable =>
-          warn("Error while responding to offset request", e)
+          error("Error while responding to offset request", e)
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
       }
     })
@@ -415,7 +297,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
   
-  def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
+  private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
     val segsArray = log.logSegments.toArray
     var offsetTimeArray: Array[(Long, Long)] = null
     if(segsArray.last.size > 0)
@@ -488,7 +370,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   /**
-   * Service the topic metadata request API
+   * Handle a topic metadata request
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
@@ -500,7 +382,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   /*
-   * Service the Offset fetch API
+   * Handle an offset fetch request
    */
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
     val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
@@ -520,7 +402,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   /*
-   * Service the consumer metadata API
+   * Handle a consumer metadata request
    */
   def handleConsumerMetadataRequest(request: RequestChannel.Request) {
     val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest]
@@ -545,9 +427,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def close() {
-    debug("Shutting down.")
-    fetchRequestPurgatory.shutdown()
-    producerRequestPurgatory.shutdown()
+    // TODO currently closing the API is an no-op since the API no longer maintain any modules
+    // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer
     debug("Shut down complete.")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 43eb2a3..2957bc4 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Gauge
 import org.I0Itec.zkclient.ZkClient
+import kafka.api.ProducerResponseStatus
 
 
 /**
@@ -144,6 +145,8 @@ class OffsetManager(val config: OffsetManagerConfig,
         trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
 
         try {
+          // do not need to require acks since even if the tombsone is lost,
+          // it will be appended again in the next purge cycle
           partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
           tombstones.size
         }
@@ -192,13 +195,91 @@ class OffsetManager(val config: OffsetManagerConfig,
     offsetsCache.put(key, offsetAndMetadata)
   }
 
-  def putOffsets(group: String, offsets: Map[TopicAndPartition, OffsetAndMetadata]) {
-    // this method is called _after_ the offsets have been durably appended to the commit log, so there is no need to
-    // check for current leadership as we do for the offset fetch
-    trace("Putting offsets %s for group %s in offsets partition %d.".format(offsets, group, partitionFor(group)))
-    offsets.foreach { case (topicAndPartition, offsetAndMetadata) =>
-      putOffset(GroupTopicPartition(group, topicAndPartition), offsetAndMetadata)
+  /*
+   * Check if the offset metadata length is valid
+   */
+  def validateOffsetMetadataLength(metadata: String) : Boolean = {
+    metadata == null || metadata.length() <= config.maxMetadataSize
+  }
+
+  /**
+   * Store offsets by appending it to the replicated log and then inserting to cache
+   */
+  // TODO: generation id and consumer id is needed by coordinator to do consumer checking in the future
+  def storeOffsets(groupName: String,
+                   consumerId: String,
+                   generationId: Int,
+                   offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
+                   responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
+
+    // first filter out partitions with offset metadata size exceeding limit
+    // TODO: in the future we may want to only support atomic commit and hence fail the whole commit
+    val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
+      validateOffsetMetadataLength(offsetAndMetadata.metadata)
     }
+
+    // construct the message set to append
+    val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+      new Message(
+        key = OffsetManager.offsetCommitKey(groupName, topicAndPartition.topic, topicAndPartition.partition),
+        bytes = OffsetManager.offsetCommitValue(offsetAndMetadata)
+      )
+    }.toSeq
+
+    val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupName))
+
+    val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
+      new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
+
+    // set the callback function to insert offsets into cache after log append completed
+    def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
+      // the append response should only contain the topics partition
+      if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
+        throw new IllegalStateException("Append status %s should only have one partition %s"
+          .format(responseStatus, offsetTopicPartition))
+
+      // construct the commit response status and insert
+      // the offset and metadata to cache iff the append status has no error
+      val status = responseStatus(offsetTopicPartition)
+
+      val responseCode =
+        if (status.error == ErrorMapping.NoError) {
+          filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
+            putOffset(GroupTopicPartition(groupName, topicAndPartition), offsetAndMetadata)
+          }
+          ErrorMapping.NoError
+        } else {
+          debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
+            .format(filteredOffsetMetadata, groupName, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error)))
+
+          // transform the log append error code to the corresponding the commit status error code
+          if (status.error == ErrorMapping.UnknownTopicOrPartitionCode)
+            ErrorMapping.ConsumerCoordinatorNotAvailableCode
+          else if (status.error == ErrorMapping.NotLeaderForPartitionCode)
+            ErrorMapping.NotCoordinatorForConsumerCode
+          else
+            status.error
+        }
+
+
+      // compute the final error codes for the commit response
+      val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+        if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+          (topicAndPartition, responseCode)
+        else
+          (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+      }
+
+      // finally trigger the callback logic passed from the API layer
+      responseCallback(commitStatus)
+    }
+
+    // call replica manager to append the offset messages
+    replicaManager.appendMessages(
+      config.offsetCommitTimeoutMs.toLong,
+      config.offsetCommitRequiredAcks,
+      offsetsAndMetadataMessageSet,
+      putCacheCallback)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
deleted file mode 100644
index d4a7d4a..0000000
--- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.metrics.KafkaMetricsGroup
-import kafka.utils.Pool
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-
-import java.util.concurrent.TimeUnit
-
-/**
- * The purgatory holding delayed producer requests
- */
-class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel)
-  extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) {
-  this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
-
-  private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup {
-    val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
-  }
-
-  private val producerRequestMetricsForKey = {
-    val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
-    new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory))
-  }
-
-  private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
-
-  private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) {
-    val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
-    List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
-  }
-
-  /**
-   * Check if a specified delayed fetch request is satisfied
-   */
-  def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager)
-
-  /**
-   * When a delayed produce request expires answer it with possible time out error codes
-   */
-  def expire(delayedProduce: DelayedProduce) {
-    debug("Expiring produce request %s.".format(delayedProduce.produce))
-    for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
-      recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition))
-    respond(delayedProduce)
-  }
-
-  // TODO: purgatory should not be responsible for sending back the responses
-  def respond(delayedProduce: DelayedProduce) {
-    val response = delayedProduce.respond(offsetManager)
-    requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response)))
-  }
-}


Mime
View raw message