kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [1/2] KAFKA-1583; Encapsulate replicated log implementation details into ReplicaManager and refactor KafkaApis; reviewed by Joel Koshy and Jun Rao
Date Thu, 30 Oct 2014 01:57:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 20f5b01fe -> 89831204c


http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 78b7514..02fa382 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -20,11 +20,11 @@ import kafka.api._
 import kafka.common._
 import kafka.utils._
 import kafka.cluster.{Broker, Partition, Replica}
-import kafka.log.LogManager
+import kafka.log.{LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
 import kafka.common.TopicAndPartition
-import kafka.message.MessageSet
+import kafka.message.{ByteBufferMessageSet, MessageSet}
 
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.{IOException, File}
@@ -39,14 +39,31 @@ import scala.Some
 import org.I0Itec.zkclient.ZkClient
 import com.yammer.metrics.core.Gauge
 
-object ReplicaManager {
-  val HighWatermarkFilename = "replication-offset-checkpoint"
+/*
+ * Result metadata of a log append operation on the log
+ */
+case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) {
+  def errorCode = error match {
+    case None => ErrorMapping.NoError
+    case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+  }
 }
 
-case class PartitionDataAndOffset(data: FetchResponsePartitionData, offset: LogOffsetMetadata)
+/*
+ * Result metadata of a log read operation on the log
+ */
+case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, error: Option[Throwable] = None) {
+  def errorCode = error match {
+    case None => ErrorMapping.NoError
+    case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+  }
+}
 
+object ReplicaManager {
+  val HighWatermarkFilename = "replication-offset-checkpoint"
+}
 
-class ReplicaManager(val config: KafkaConfig, 
+class ReplicaManager(val config: KafkaConfig,
                      time: Time, 
                      val zkClient: ZkClient, 
                      scheduler: Scheduler,
@@ -64,8 +81,9 @@ class ReplicaManager(val config: KafkaConfig,
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
   val stateChangeLogger = KafkaController.stateChangeLogger
 
-  var producerRequestPurgatory: ProducerRequestPurgatory = null
-  var fetchRequestPurgatory: FetchRequestPurgatory = null
+  val producerRequestPurgatory = new RequestPurgatory[DelayedProduce](config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
+  val fetchRequestPurgatory = new RequestPurgatory[DelayedFetch](config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
+
 
   newGauge(
     "LeaderCount",
@@ -100,37 +118,27 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * Initialize the replica manager with the request purgatory
+   * Try to complete some delayed produce requests with the request key;
+   * this can be triggered when:
    *
-   * TODO: will be removed in 0.9 where we refactor server structure
-   */
-
-  def initWithRequestPurgatory(producerRequestPurgatory: ProducerRequestPurgatory, fetchRequestPurgatory: FetchRequestPurgatory) {
-    this.producerRequestPurgatory = producerRequestPurgatory
-    this.fetchRequestPurgatory = fetchRequestPurgatory
-  }
-
-  /**
-   * Unblock some delayed produce requests with the request key
+   * 1. The partition HW has changed (for acks = -1)
+   * 2. A follower replica's fetch operation is received (for acks > 1)
    */
-  def unblockDelayedProduceRequests(key: DelayedRequestKey) {
-    val satisfied = producerRequestPurgatory.update(key)
-    debug("Request key %s unblocked %d producer requests."
-      .format(key.keyLabel, satisfied.size))
-
-    // send any newly unblocked responses
-    satisfied.foreach(producerRequestPurgatory.respond(_))
+  def tryCompleteDelayedProduce(key: DelayedRequestKey) {
+    val completed = producerRequestPurgatory.checkAndComplete(key)
+    debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed))
   }
 
   /**
-   * Unblock some delayed fetch requests with the request key
+   * Try to complete some delayed fetch requests with the request key;
+   * this can be triggered when:
+   *
+   * 1. The partition HW has changed (for regular fetch)
+   * 2. A new message set is appended to the local log (for follower fetch)
    */
-  def unblockDelayedFetchRequests(key: DelayedRequestKey) {
-    val satisfied = fetchRequestPurgatory.update(key)
-    debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size))
-
-    // send any newly unblocked responses
-    satisfied.foreach(fetchRequestPurgatory.respond(_))
+  def tryCompleteDelayedFetch(key: DelayedRequestKey) {
+    val completed = fetchRequestPurgatory.checkAndComplete(key)
+    debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
   }
 
   def startup() {
@@ -237,74 +245,205 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * Read from all the offset details given and return a map of
-   * (topic, partition) -> PartitionData
+   * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
+   * the callback function will be triggered either when timeout or the required acks are satisfied
    */
-  def readMessageSets(fetchRequest: FetchRequest) = {
-    val isFetchFromFollower = fetchRequest.isFromFollower
-    fetchRequest.requestInfo.map
-    {
-      case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
-        val partitionDataAndOffsetInfo =
-          try {
-            val (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
-            BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes)
-            BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes)
-            if (isFetchFromFollower) {
-              debug("Partition [%s,%d] received fetch request from follower %d"
-                .format(topic, partition, fetchRequest.replicaId))
-            }
-            new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, fetchInfo.messageSet), fetchInfo.fetchOffset)
-          } catch {
-            // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
-            // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
-            // for a partition it is the leader for
-            case utpe: UnknownTopicOrPartitionException =>
-              warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
-                fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage))
-              new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata)
-            case nle: NotLeaderForPartitionException =>
-              warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
-                fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
-              new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata)
-            case t: Throwable =>
-              BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
-              BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
-              error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s"
-                .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage))
-              new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata)
-          }
-        (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo)
+  def appendMessages(timeout: Long,
+                     requiredAcks: Short,
+                     messagesPerPartition: Map[TopicAndPartition, MessageSet],
+                     responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) {
+
+    val sTime = SystemTime.milliseconds
+    val localProduceResults = appendToLocalLog(messagesPerPartition, requiredAcks)
+    debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+
+    val produceStatus = localProduceResults.map{ case (topicAndPartition, result) =>
+      topicAndPartition ->
+        ProducePartitionStatus(
+          result.info.lastOffset + 1, // required offset
+          ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status
+    }
+
+    if(requiredAcks == 0 ||
+      requiredAcks == 1 ||
+      messagesPerPartition.size <= 0 ||
+      localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size) {
+      // in case of the following we can respond immediately:
+      //
+      // 1. required acks = 0 or 1
+      // 2. there is no data to append
+      // 3. all partition appends have failed
+      val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
+      responseCallback(produceResponseStatus)
+    } else {
+      // create delayed produce operation
+      val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
+      val delayedProduce =  new DelayedProduce(timeout, produceMetadata, this, responseCallback)
+
+      // create a list of (topic, partition) pairs to use as keys for this delayed request
+      val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionRequestKey(_)).toSeq
+
+      // try to complete the request immediately, otherwise put it into the purgatory
+      // this is because while the delayed request is being created, new requests may
+      // arrive which can make this request completable.
+      producerRequestPurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
+    }
+  }
+
+  /**
+   * Append the messages to the local replica logs
+   */
+  private def appendToLocalLog(messagesPerPartition: Map[TopicAndPartition, MessageSet],
+                               requiredAcks: Short): Map[TopicAndPartition, LogAppendResult] = {
+    trace("Append [%s] to local log ".format(messagesPerPartition))
+    messagesPerPartition.map { case (topicAndPartition, messages) =>
+      try {
+        val partitionOpt = getPartition(topicAndPartition.topic, topicAndPartition.partition)
+        val info = partitionOpt match {
+          case Some(partition) =>
+            partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks)
+          case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
+            .format(topicAndPartition, localBrokerId))
+        }
+
+        val numAppendedMessages =
+          if (info.firstOffset == -1L || info.lastOffset == -1L)
+            0
+          else
+            info.lastOffset - info.firstOffset + 1
+
+        // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
+        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
+        BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
+        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
+        BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
+
+        trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
+          .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
+        (topicAndPartition, LogAppendResult(info))
+      } catch {
+        // NOTE: Failed produce requests metric is not incremented for known exceptions
+        // it is supposed to indicate un-expected failures of a broker in handling a produce request
+        case e: KafkaStorageException =>
+          fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
+          Runtime.getRuntime.halt(1)
+          (topicAndPartition, null)
+        case utpe: UnknownTopicOrPartitionException =>
+          (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe)))
+        case nle: NotLeaderForPartitionException =>
+          (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle)))
+        case e: Throwable =>
+          BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
+          BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
+          error("Error processing append operation on partition %s".format(topicAndPartition), e)
+          (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
+      }
+    }
+  }
+
+  /**
+   * Fetch messages from the leader replica, and wait until enough data can be fetched and return;
+   * the callback function will be triggered either when timeout or required fetch info is satisfied
+   */
+  def fetchMessages(timeout: Long,
+                    replicaId: Int,
+                    fetchMinBytes: Int,
+                    fetchInfo: Map[TopicAndPartition, PartitionFetchInfo],
+                    responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
+
+    val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
+    val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
+
+    // read from local logs
+    val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)
+
+    // if the fetch comes from the follower,
+    // update its corresponding log end offset
+    if(Request.isValidBrokerId(replicaId))
+      updateFollowerLEOs(replicaId, logReadResults.mapValues(_.info.fetchOffset))
+
+    // check if this fetch request can be satisfied right away
+    val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum
+    val errorReadingData = logReadResults.values.foldLeft(false) ((errorIncurred, readResult) =>
+      errorIncurred || (readResult.errorCode != ErrorMapping.NoError))
+
+    // respond immediately if 1) fetch request does not want to wait
+    //                        2) fetch request does not require any data
+    //                        3) has enough data to respond
+    //                        4) some error happens while reading data
+    if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) {
+      val fetchPartitionData = logReadResults.mapValues(result =>
+        FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
+      responseCallback(fetchPartitionData)
+    } else {
+      // construct the fetch results from the read results
+      val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) =>
+        (topicAndPartition, FetchPartitionStatus(result.info.fetchOffset, fetchInfo.get(topicAndPartition).get))
+      }
+      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, fetchPartitionStatus)
+      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
+
+      // create a list of (topic, partition) pairs to use as keys for this delayed request
+      val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionRequestKey(_)).toSeq
+
+      // try to complete the request immediately, otherwise put it into the purgatory;
+      // this is because while the delayed request is being created, new requests may
+      // arrive which can make this request completable.
+      fetchRequestPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
     }
   }
 
   /**
    * Read from a single topic/partition at the given offset upto maxSize bytes
    */
-  private def readMessageSet(topic: String,
-                             partition: Int,
-                             offset: Long,
-                             maxSize: Int,
-                             fromReplicaId: Int): (FetchDataInfo, Long) = {
-    // check if the current broker is the leader for the partitions
-    val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)
-      getReplicaOrException(topic, partition)
-    else
-      getLeaderReplicaIfLocal(topic, partition)
-    trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
-    val maxOffsetOpt =
-      if (Request.isValidBrokerId(fromReplicaId))
-        None
-      else
-        Some(localReplica.highWatermark.messageOffset)
-    val fetchInfo = localReplica.log match {
-      case Some(log) =>
-        log.read(offset, maxSize, maxOffsetOpt)
-      case None =>
-        error("Leader for partition [%s,%d] does not have a local log".format(topic, partition))
-        FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty)
+  def readFromLocalLog(fetchOnlyFromLeader: Boolean,
+                       readOnlyCommitted: Boolean,
+                       readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = {
+
+    readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
+      val partitionDataAndOffsetInfo =
+        try {
+          trace("Fetching log segment for topic %s, partition %d, offset %d, size %d".format(topic, partition, offset, fetchSize))
+
+          // decide whether to only fetch from leader
+          val localReplica = if (fetchOnlyFromLeader)
+            getLeaderReplicaIfLocal(topic, partition)
+          else
+            getReplicaOrException(topic, partition)
+
+          // decide whether to only fetch committed data (i.e. messages below high watermark)
+          val maxOffsetOpt = if (readOnlyCommitted)
+            Some(localReplica.highWatermark.messageOffset)
+          else
+            None
+
+          // read on log
+          val logReadInfo = localReplica.log match {
+            case Some(log) =>
+              log.read(offset, fetchSize, maxOffsetOpt)
+            case None =>
+              error("Leader for partition [%s,%d] does not have a local log".format(topic, partition))
+              FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty)
+          }
+
+          LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, None)
+        } catch {
+          // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
+          // is supposed to indicate un-expected failure of a broker in handling a fetch request
+          case utpe: UnknownTopicOrPartitionException =>
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(utpe))
+          case nle: NotLeaderForPartitionException =>
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(nle))
+          case rnae: ReplicaNotAvailableException =>
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(rnae))
+          case e: Throwable =>
+            BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
+            BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
+            error("Error processing fetch operation on partition [%s,%d] offset %d".format(topic, partition, offset))
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(e))
+        }
+      (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo)
     }
-    (fetchInfo, localReplica.highWatermark.messageOffset)
   }
 
   def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
@@ -557,32 +696,27 @@ class ReplicaManager(val config: KafkaConfig,
     allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
   }
 
-  def updateReplicaLEOAndPartitionHW(topic: String, partitionId: Int, replicaId: Int, offset: LogOffsetMetadata) = {
-    getPartition(topic, partitionId) match {
-      case Some(partition) =>
-        partition.getReplica(replicaId) match {
-          case Some(replica) =>
-            replica.logEndOffset = offset
-            // check if we need to update HW and expand Isr
-            partition.updateLeaderHWAndMaybeExpandIsr(replicaId)
-            debug("Recorded follower %d position %d for partition [%s,%d].".format(replicaId, offset.messageOffset, topic, partitionId))
-          case None =>
-            throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" +
-              " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId,
-              offset.messageOffset, partition.assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId))
-
-        }
-      case None =>
-        warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId))
+  private def updateFollowerLEOs(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) {
+    debug("Recording follower broker %d log end offsets: %s ".format(replicaId, offsets))
+    offsets.foreach { case (topicAndPartition, offset) =>
+      getPartition(topicAndPartition.topic, topicAndPartition.partition) match {
+        case Some(partition) =>
+          partition.updateReplicaLEO(replicaId, offset)
+
+          // for producer requests with ack > 1, we need to check
+          // if they can be unblocked after some follower's log end offsets have moved
+          tryCompleteDelayedProduce(new TopicPartitionRequestKey(topicAndPartition))
+        case None =>
+          warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicAndPartition))
+      }
     }
   }
 
   private def getLeaderPartitions() : List[Partition] = {
     allPartitions.values.filter(_.leaderReplicaIfLocal().isDefined).toList
   }
-  /**
-   * Flushes the highwatermark value for all partitions to the highwatermark file
-   */
+
+  // Flushes the highwatermark value for all partitions to the highwatermark file
   def checkpointHighWatermarks() {
     val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
     val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
@@ -598,10 +732,14 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def shutdown() {
-    info("Shut down")
+  // High watermark do not need to be checkpointed only when under unit tests
+  def shutdown(checkpointHW: Boolean = true) {
+    info("Shutting down")
     replicaFetcherManager.shutdown()
-    checkpointHighWatermarks()
+    fetchRequestPurgatory.shutdown()
+    producerRequestPurgatory.shutdown()
+    if (checkpointHW)
+      checkpointHighWatermarks()
     info("Shut down completely")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 9d76234..323b12e 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import kafka.network._
 import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
 
@@ -30,50 +29,75 @@ import com.yammer.metrics.core.Gauge
 
 
 /**
- * A request whose processing needs to be delayed for at most the given delayMs
- * The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied,
- * for example a key could be a (topic, partition) pair.
+ * An operation whose processing needs to be delayed for at most the given delayMs. For example
+ * a delayed produce operation could be waiting for specified number of acks; or
+ * a delayed fetch operation could be waiting for a given number of bytes to accumulate.
+ *
+ * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once.
+ * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either
+ * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed,
+ * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls
+ * forceComplete().
+ *
+ * A subclass of DelayedRequest needs to provide an implementation of both onComplete() and tryComplete().
  */
-class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
-  val satisfied = new AtomicBoolean(false)
+abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) {
+  private val completed = new AtomicBoolean(false)
+
+  /*
+   * Force completing the delayed operation, if not already completed.
+   * This function can be triggered when
+   *
+   * 1. The operation has been verified to be completable inside tryComplete()
+   * 2. The operation has expired and hence needs to be completed right now
+   *
+   * Return true iff the operation is completed by the caller
+   */
+  def forceComplete(): Boolean = {
+    if (completed.compareAndSet(false, true)) {
+      onComplete()
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Check if the delayed operation is already completed
+   */
+  def isCompleted(): Boolean = completed.get()
+
+  /**
+   * Process for completing an operation; This function needs to be defined in subclasses
+   * and will be called exactly once in forceComplete()
+   */
+  def onComplete(): Unit
+
+  /*
+   * Try to complete the delayed operation by first checking if the operation
+   * can be completed by now. If yes execute the completion logic by calling
+   * forceComplete() and return true iff forceComplete returns true; otherwise return false
+   *
+   * Note that concurrent threads can check if an operation can be completed or not,
+   * but only the first thread will succeed in completing the operation and return
+   * true, others will still return false
+   *
+   * this function needs to be defined in subclasses
+   */
+  def tryComplete(): Boolean
 }
 
 /**
- * A helper class for dealing with asynchronous requests with a timeout. A DelayedRequest has a request to delay
- * and also a list of keys that can trigger the action. Implementations can add customized logic to control what it means for a given
- * request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition)
- * to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request
- * to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting).
- *
- * For us the key is generally a (topic, partition) pair.
- * By calling 
- *   val isSatisfiedByMe = checkAndMaybeWatch(delayedRequest)
- * we will check if a request is satisfied already, and if not add the request for watch on all its keys.
- *
- * It is up to the user to then call
- *   val satisfied = update(key, request) 
- * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this
- * new request.
- *
- * An implementation provides extends two helper functions
- *   def checkSatisfied(request: R, delayed: T): Boolean
- * this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed
- * request delayed. This method will likely also need to do whatever bookkeeping is necessary.
- *
- * The second function is
- *   def expire(delayed: T)
- * this function handles delayed requests that have hit their time limit without being satisfied.
- *
+ * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
  */
-abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000)
+class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000)
         extends Logging with KafkaMetricsGroup {
 
   /* a list of requests watching each key */
   private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
 
   /* background thread expiring requests that have been waiting too long */
-  private val expiredRequestReaper = new ExpiredRequestReaper
-  private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)
+  private val expirationReaper = new ExpiredOperationReaper
 
   newGauge(
     "PurgatorySize",
@@ -89,230 +113,182 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
     }
   )
 
-  expirationThread.start()
+  expirationReaper.start()
 
   /**
-   * Try to add the request for watch on all keys. Return true iff the request is
-   * satisfied and the satisfaction is done by the caller.
+   * Check if the operation can be completed, if not watch it based on the given watch keys
+   *
+   * Note that a delayed operation can be watched on multiple keys. It is possible that
+   * an operation is completed after it has been added to the watch list for some, but
+   * not all of the keys. In this case, the operation is considered completed and won't
+   * be added to the watch list of the remaining keys. The expiration reaper thread will
+   * remove this operation from any watcher list in which the operation exists.
    *
-   * Requests can be watched on only a few of the keys if it is found satisfied when
-   * trying to add it to each one of the keys. In this case the request is still treated as satisfied
-   * and hence no longer watched. Those already added elements will be later purged by the expire reaper.
+   * @param operation the delayed operation to be checked
+   * @param watchKeys keys for bookkeeping the operation
+   * @return true iff the delayed operations can be completed by the caller
    */
-  def checkAndMaybeWatch(delayedRequest: T): Boolean = {
-    for(key <- delayedRequest.keys) {
-      val lst = watchersFor(key)
-      if(!lst.checkAndMaybeAdd(delayedRequest)) {
-        if(delayedRequest.satisfied.compareAndSet(false, true))
-          return true
-        else
-          return false
+  def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
+    for(key <- watchKeys) {
+      // if the operation is already completed, stopping adding it to
+      // any further lists and return false
+      if (operation.isCompleted())
+        return false
+      val watchers = watchersFor(key)
+      // if the operation can by completed by myself, stop adding it to
+      // any further lists and return true immediately
+      if(operation synchronized operation.tryComplete()) {
+        return true
+      } else {
+        watchers.watch(operation)
       }
     }
 
-    // if it is indeed watched, add to the expire queue also
-    expiredRequestReaper.enqueue(delayedRequest)
+    // if it cannot be completed by now and hence is watched, add to the expire queue also
+    if (! operation.isCompleted()) {
+      expirationReaper.enqueue(operation)
+    }
 
     false
   }
 
   /**
-   * Update any watchers and return a list of newly satisfied requests.
+   * Check if some some delayed requests can be completed with the given watch key,
+   * and if yes complete them.
+   *
+   * @return the number of completed requests during this process
    */
-  def update(key: Any): Seq[T] = {
-    val w = watchersForKey.get(key)
-    if(w == null)
-      Seq.empty
+  def checkAndComplete(key: Any): Int = {
+    val watchers = watchersForKey.get(key)
+    if(watchers == null)
+      0
     else
-      w.collectSatisfiedRequests()
+      watchers.tryCompleteWatched()
   }
 
-  /*
-   * Return the size of the watched lists in the purgatory, which is the size of watch lists.
-   * Since an operation may still be in the watch lists even when it has been completed,
-   * this number may be larger than the number of real operations watched
+  /**
+   * Return the total size of watch lists the purgatory. Since an operation may be watched
+   * on multiple lists, and some of its watched entries may still be in the watch lists
+   * even when it has been completed, this number may be larger than the number of real operations watched
    */
   def watched() = watchersForKey.values.map(_.watched).sum
 
-  /*
-   * Return the number of requests in the expiry reaper's queue
+  /**
+   * Return the number of delayed operations in the expiry queue
    */
-  def delayed() = expiredRequestReaper.delayed()
+  def delayed() = expirationReaper.delayed
 
   /*
-   * Return the watch list for the given watch key
+   * Return the watch list of the given key
    */
   private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key)
-  
-  /**
-   * Check if this delayed request is already satisfied
-   */
-  protected def checkSatisfied(request: T): Boolean
-
-  /**
-   * Handle an expired delayed request
-   */
-  protected def expire(delayed: T)
 
   /**
    * Shutdown the expire reaper thread
    */
   def shutdown() {
-    expiredRequestReaper.shutdown()
+    expirationReaper.shutdown()
   }
 
   /**
-   * A linked list of DelayedRequests watching some key with some associated
-   * bookkeeping logic.
+   * A linked list of watched delayed operations based on some key
    */
   private class Watchers {
     private val requests = new util.LinkedList[T]
 
-    // return the size of the watch list
-    def watched() = requests.size()
+    def watched = requests.size()
 
-    // potentially add the element to watch if it is not satisfied yet
-    def checkAndMaybeAdd(t: T): Boolean = {
+    // add the element to watch
+    def watch(t: T) {
       synchronized {
-        // if it is already satisfied, do not add to the watch list
-        if (t.satisfied.get)
-          return false
-        // synchronize on the delayed request to avoid any race condition
-        // with expire and update threads on client-side.
-        if(t synchronized checkSatisfied(t)) {
-          return false
-        }
         requests.add(t)
-        return true
       }
     }
 
-    // traverse the list and purge satisfied elements
-    def purgeSatisfied(): Int = {
+    // traverse the list and try to complete some watched elements
+    def tryCompleteWatched(): Int = {
+      var completed = 0
       synchronized {
         val iter = requests.iterator()
-        var purged = 0
         while(iter.hasNext) {
           val curr = iter.next
-          if(curr.satisfied.get()) {
+          if (curr.isCompleted()) {
+            // another thread has completed this request, just remove it
             iter.remove()
-            purged += 1
+          } else {
+            if(curr synchronized curr.tryComplete()) {
+              iter.remove()
+              completed += 1
+            }
           }
         }
-        purged
       }
+      completed
     }
 
-    // traverse the list and try to satisfy watched elements
-    def collectSatisfiedRequests(): Seq[T] = {
-      val response = new mutable.ArrayBuffer[T]
+    // traverse the list and purge elements that are already completed by others
+    def purgeCompleted(): Int = {
+      var purged = 0
       synchronized {
         val iter = requests.iterator()
-        while(iter.hasNext) {
+        while (iter.hasNext) {
           val curr = iter.next
-          if(curr.satisfied.get) {
-            // another thread has satisfied this request, remove it
+          if(curr.isCompleted()) {
             iter.remove()
-          } else {
-            // synchronize on curr to avoid any race condition with expire
-            // on client-side.
-            val satisfied = curr synchronized checkSatisfied(curr)
-            if(satisfied) {
-              iter.remove()
-              val updated = curr.satisfied.compareAndSet(false, true)
-              if(updated == true) {
-                response += curr
-              }
-            }
+            purged += 1
           }
         }
       }
-      response
+      purged
     }
   }
 
   /**
-   * Runnable to expire requests that have sat unfullfilled past their deadline
+   * A background reaper to expire delayed operations that have timed out
    */
-  private class ExpiredRequestReaper extends Runnable with Logging {
-    this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId)
-    private val running = new AtomicBoolean(true)
-    private val shutdownLatch = new CountDownLatch(1)
+  private class ExpiredOperationReaper extends ShutdownableThread(
+    "ExpirationReaper-%d".format(brokerId),
+    false) {
 
+    /* The queue storing all delayed operations */
     private val delayedQueue = new DelayQueue[T]
 
+    /*
+     * Return the number of delayed operations kept by the reaper
+     */
     def delayed() = delayedQueue.size()
-    
-    /** Main loop for the expiry thread */
-    def run() {
-      while(running.get) {
-        try {
-          val curr = pollExpired()
-          if (curr != null) {
-            curr synchronized {
-              expire(curr)
-            }
-          }
-          // see if we need to purge the watch lists
-          if (RequestPurgatory.this.watched() >= purgeInterval) {
-            debug("Begin purging watch lists")
-            val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
-            debug("Purged %d elements from watch lists.".format(numPurgedFromWatchers))
-          }
-          // see if we need to purge the delayed request queue
-          if (delayed() >= purgeInterval) {
-            debug("Begin purging delayed queue")
-            val purged = purgeSatisfied()
-            debug("Purged %d requests from delayed queue.".format(purged))
-          }
-        } catch {
-          case e: Exception =>
-            error("Error in long poll expiry thread: ", e)
-        }
-      }
-      shutdownLatch.countDown()
-    }
 
-    /** Add a request to be expired */
+    /*
+     * Add an operation to be expired
+     */
     def enqueue(t: T) {
       delayedQueue.add(t)
     }
 
-    /** Shutdown the expiry thread*/
-    def shutdown() {
-      debug("Shutting down.")
-      running.set(false)
-      shutdownLatch.await()
-      debug("Shut down complete.")
-    }
-
     /**
-     * Get the next expired event
+     * Try to get the next expired event and force completing it
      */
-    private def pollExpired(): T = {
-      while(true) {
-        val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)
-        if (curr == null)
-          return null.asInstanceOf[T]
-        val updated = curr.satisfied.compareAndSet(false, true)
-        if(updated) {
-          return curr
+    private def expireNext() {
+      val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)
+      if (curr != null.asInstanceOf[T]) {
+        // if there is an expired operation, try to force complete it
+        if (curr synchronized curr.forceComplete()) {
+          debug("Force complete expired delayed operation %s".format(curr))
         }
       }
-      throw new RuntimeException("This should not happen")
     }
 
     /**
      * Delete all satisfied events from the delay queue and the watcher lists
      */
-    private def purgeSatisfied(): Int = {
+    private def purgeCompleted(): Int = {
       var purged = 0
 
       // purge the delayed queue
       val iter = delayedQueue.iterator()
-      while(iter.hasNext) {
+      while (iter.hasNext) {
         val curr = iter.next()
-        if(curr.satisfied.get) {
+        if (curr.isCompleted()) {
           iter.remove()
           purged += 1
         }
@@ -320,6 +296,22 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
 
       purged
     }
-  }
 
+    override def doWork() {
+      // try to get the next expired operation and force completing it
+      expireNext()
+      // see if we need to purge the watch lists
+      if (RequestPurgatory.this.watched() >= purgeInterval) {
+        debug("Begin purging watch lists")
+        val purged = watchersForKey.values.map(_.purgeCompleted()).sum
+        debug("Purged %d elements from watch lists.".format(purged))
+      }
+      // see if we need to purge the delayed request queue
+      if (delayed() >= purgeInterval) {
+        debug("Begin purging delayed queue")
+        val purged = purgeCompleted()
+        debug("Purged %d operations from delayed queue.".format(purged))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/utils/DelayedItem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala
index d727649..a4e0dab 100644
--- a/core/src/main/scala/kafka/utils/DelayedItem.scala
+++ b/core/src/main/scala/kafka/utils/DelayedItem.scala
@@ -20,7 +20,7 @@ package kafka.utils
 import java.util.concurrent._
 import scala.math._
 
-class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed with Logging {
+class DelayedItem(delay: Long, unit: TimeUnit) extends Delayed with Logging {
 
   val createdMs = SystemTime.milliseconds
   val delayMs = {
@@ -29,8 +29,8 @@ class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed w
     else given
   }
 
-  def this(item: T, delayMs: Long) = 
-    this(item, delayMs, TimeUnit.MILLISECONDS)
+  def this(delayMs: Long) =
+    this(delayMs, TimeUnit.MILLISECONDS)
 
   /**
    * The remaining delay time
@@ -41,7 +41,7 @@ class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed w
   }
     
   def compareTo(d: Delayed): Int = {
-    val delayed = d.asInstanceOf[DelayedItem[T]]
+    val delayed = d.asInstanceOf[DelayedItem]
     val myEnd = createdMs + delayMs
     val yourEnd = delayed.createdMs + delayed.delayMs
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 209a409..8531f53 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -305,13 +305,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
   @Test
   def testNotEnoughReplicas() {
     val topicName = "minisrtest"
-    val topicProps = new Properties();
-    topicProps.put("min.insync.replicas","3");
-
+    val topicProps = new Properties()
+    topicProps.put("min.insync.replicas","3")
 
     TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
 
-
     val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
     try {
       producer3.send(record).get
@@ -327,18 +325,16 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
   @Test
   def testNotEnoughReplicasAfterBrokerShutdown() {
     val topicName = "minisrtest2"
-    val topicProps = new Properties();
-    topicProps.put("min.insync.replicas","2");
-
+    val topicProps = new Properties()
+    topicProps.put("min.insync.replicas","2")
 
     TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
 
-
     val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
-    // This should work
+    // this should work with all brokers up and running
     producer3.send(record).get
 
-    //shut down one broker
+    // shut down one broker
     servers.head.shutdown()
     servers.head.awaitShutdown()
     try {
@@ -351,8 +347,8 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
         }
     }
 
+    // restart the server
     servers.head.startup()
-
   }
 
   private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index fb61d55..d60d8e0 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -237,8 +237,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     props.put("request.required.acks", "-1")
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    val topicProps = new Properties();
-    topicProps.put("min.insync.replicas","2");
+    val topicProps = new Properties()
+    topicProps.put("min.insync.replicas","2")
     AdminUtils.createTopic(zkClient, topicName, 1, 1,topicProps)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicName, 0)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 03a424d..8913fc1 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -74,6 +74,9 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     fooPartition0Hw = hwmFor(replicaManager, topic, 0)
     assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
     EasyMock.verify(zkClient)
+
+    // shutdown the replica manager upon test completion
+    replicaManager.shutdown(false)
   }
 
   def testHighWatermarkPersistenceMultiplePartitions() {
@@ -130,6 +133,10 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
     assertEquals(10L, topic1Partition0Hw)
     EasyMock.verify(zkClient)
+
+    // shutdown the replica manager upon test completion
+    replicaManager.shutdown(false)
+
   }
 
   def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index cd302aa..a703d27 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -36,8 +36,21 @@ class IsrExpirationTest extends JUnit3Suite {
   })
   val topic = "foo"
 
+  val time = new MockTime
+
+  var replicaManager: ReplicaManager = null
+
+  override def setUp() {
+    super.setUp()
+    replicaManager = new ReplicaManager(configs.head, time, null, null, null, new AtomicBoolean(false))
+  }
+
+  override def tearDown() {
+    replicaManager.shutdown(false)
+    super.tearDown()
+  }
+
   def testIsrExpirationForStuckFollowers() {
-    val time = new MockTime
     val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
 
     // create one partition and all replicas
@@ -61,7 +74,6 @@ class IsrExpirationTest extends JUnit3Suite {
   }
 
   def testIsrExpirationForSlowFollowers() {
-    val time = new MockTime
     // create leader replica
     val log = getLogWithLogEndOffset(15L, 1)
     // add one partition
@@ -82,7 +94,6 @@ class IsrExpirationTest extends JUnit3Suite {
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
                                                localLog: Log): Partition = {
     val leaderId=config.brokerId
-    val replicaManager = new ReplicaManager(config, time, null, null, null, new AtomicBoolean(false))
     val partition = replicaManager.getOrCreatePartition(topic, partitionId)
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a9c4ddc..faa9071 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -42,6 +42,9 @@ class ReplicaManagerTest extends JUnit3Suite {
     val partition = rm.getOrCreatePartition(topic, 1)
     partition.getOrCreateReplica(1)
     rm.checkpointHighWatermarks()
+
+    // shutdown the replica manager upon test completion
+    rm.shutdown(false)
   }
 
   @Test
@@ -56,5 +59,8 @@ class ReplicaManagerTest extends JUnit3Suite {
     val partition = rm.getOrCreatePartition(topic, 1)
     partition.getOrCreateReplica(1)
     rm.checkpointHighWatermarks()
+
+    // shutdown the replica manager upon test completion
+    rm.shutdown(false)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
index a577f4a..a7720d5 100644
--- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
@@ -17,24 +17,18 @@
 
 package kafka.server
 
-import scala.collection._
 import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
 import junit.framework.Assert._
-import kafka.message._
-import kafka.api._
 import kafka.utils.TestUtils
-import org.scalatest.junit.JUnit3Suite
-
 
 class RequestPurgatoryTest extends JUnit3Suite {
 
-  val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes)))
-  val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes)))
-  var purgatory: MockRequestPurgatory = null
+  var purgatory: RequestPurgatory[MockDelayedRequest] = null
   
   override def setUp() {
     super.setUp()
-    purgatory = new MockRequestPurgatory(5)
+    purgatory = new RequestPurgatory[MockDelayedRequest](0, 5)
   }
   
   override def tearDown() {
@@ -44,58 +38,59 @@ class RequestPurgatoryTest extends JUnit3Suite {
 
   @Test
   def testRequestSatisfaction() {
-    val r1 = new DelayedRequest(Array("test1"), null, 100000L)
-    val r2 = new DelayedRequest(Array("test2"), null, 100000L)
-    assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size)
-    assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1))
-    assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size)
-    assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2))
-    assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size)
-    purgatory.satisfied += r1
-    assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1"))
-    assertEquals("Nothing satisfied", 0, purgatory.update("test1").size)
-    purgatory.satisfied += r2
-    assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2"))
-    assertEquals("Nothing satisfied", 0, purgatory.update("test2").size)
+    val r1 = new MockDelayedRequest(100000L)
+    val r2 = new MockDelayedRequest(100000L)
+    assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.checkAndComplete("test1"))
+    assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1")))
+    assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test1"))
+    assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2")))
+    assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test2"))
+    r1.completable = true
+    assertEquals("r1 satisfied", 1, purgatory.checkAndComplete("test1"))
+    assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test1"))
+    r2.completable = true
+    assertEquals("r2 satisfied", 1, purgatory.checkAndComplete("test2"))
+    assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test2"))
   }
 
   @Test
   def testRequestExpiry() {
     val expiration = 20L
-    val r1 = new DelayedRequest(Array("test1"), null, expiration)
-    val r2 = new DelayedRequest(Array("test1"), null, 200000L)
+    val r1 = new MockDelayedRequest(expiration)
+    val r2 = new MockDelayedRequest(200000L)
     val start = System.currentTimeMillis
-    assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1))
-    assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2))
-    purgatory.awaitExpiration(r1)
+    assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1")))
+    assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2")))
+    r1.awaitExpiration()
     val elapsed = System.currentTimeMillis - start
-    assertTrue("r1 expired", purgatory.expired.contains(r1))
-    assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2))
+    assertTrue("r1 completed due to expiration", r1.isCompleted())
+    assertFalse("r2 hasn't completed", r2.isCompleted())
     assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration)
   }
 
   @Test
   def testRequestPurge() {
-    val r1 = new DelayedRequest(Array("test1"), null, 100000L)
-    val r12 = new DelayedRequest(Array("test1", "test2"), null, 100000L)
-    val r23 = new DelayedRequest(Array("test2", "test3"), null, 100000L)
-    purgatory.checkAndMaybeWatch(r1)
-    purgatory.checkAndMaybeWatch(r12)
-    purgatory.checkAndMaybeWatch(r23)
+    val r1 = new MockDelayedRequest(100000L)
+    val r2 = new MockDelayedRequest(100000L)
+    purgatory.tryCompleteElseWatch(r1, Array("test1"))
+    purgatory.tryCompleteElseWatch(r2, Array("test1", "test2"))
+    purgatory.tryCompleteElseWatch(r1, Array("test2", "test3"))
 
     assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched())
     assertEquals("Purgatory should have 3 total delayed requests", 3, purgatory.delayed())
 
-    // satisfy one of the requests, it should then be purged from the watch list with purge interval 5
-    r12.satisfied.set(true)
+    // complete one of the operations, it should
+    // eventually be purged from the watch list with purge interval 5
+    r2.completable = true
+    r2.tryComplete()
     TestUtils.waitUntilTrue(() => purgatory.watched() == 3,
-      "Purgatory should have 3 watched elements instead of " +  + purgatory.watched(), 1000L)
+      "Purgatory should have 3 watched elements instead of " + purgatory.watched(), 1000L)
     TestUtils.waitUntilTrue(() => purgatory.delayed() == 3,
       "Purgatory should still have 3 total delayed requests instead of " + purgatory.delayed(), 1000L)
 
     // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5
-    purgatory.checkAndMaybeWatch(r1)
-    purgatory.checkAndMaybeWatch(r1)
+    purgatory.tryCompleteElseWatch(r1, Array("test1"))
+    purgatory.tryCompleteElseWatch(r1, Array("test1"))
 
     TestUtils.waitUntilTrue(() => purgatory.watched() == 5,
       "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L)
@@ -103,19 +98,25 @@ class RequestPurgatoryTest extends JUnit3Suite {
       "Purgatory should have 4 total delayed requests instead of " + purgatory.delayed(), 1000L)
   }
   
-  class MockRequestPurgatory(purge: Int) extends RequestPurgatory[DelayedRequest](purgeInterval = purge) {
-    val satisfied = mutable.Set[DelayedRequest]()
-    val expired = mutable.Set[DelayedRequest]()
-    def awaitExpiration(delayed: DelayedRequest) = {
-      delayed synchronized {
-        delayed.wait()
+  class MockDelayedRequest(delayMs: Long) extends DelayedRequest(delayMs) {
+    var completable = false
+
+    def awaitExpiration() {
+      synchronized {
+        wait()
       }
     }
-    def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed)
-    def expire(delayed: DelayedRequest) {
-      expired += delayed
-      delayed synchronized {
-        delayed.notify()
+
+    override def tryComplete() = {
+      if (completable)
+        forceComplete()
+      else
+        false
+    }
+
+    override def onComplete() {
+      synchronized {
+        notify()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 3804a11..1bfb501 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -106,7 +106,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     val newProps = TestUtils.createBrokerConfig(0, port)
     newProps.setProperty("delete.topic.enable", "true")
     val newConfig = new KafkaConfig(newProps)
-    var server = new KafkaServer(newConfig)
+    val server = new KafkaServer(newConfig)
     server.startup()
     server.shutdown()
     server.awaitShutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 09ed8f5..ccf5e2e 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -17,18 +17,21 @@
 package kafka.server
 
 import kafka.api._
-import kafka.cluster.{Partition, Replica}
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.utils._
+import kafka.cluster.Replica
+import kafka.common.TopicAndPartition
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
-import kafka.network.RequestChannel
-import kafka.utils.{ZkUtils, Time, TestUtils, MockTime}
 
 import scala.Some
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicBoolean
+import collection.JavaConversions._
 
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
 
 class SimpleFetchTest extends JUnit3Suite {
 
@@ -37,215 +40,102 @@ class SimpleFetchTest extends JUnit3Suite {
     override val replicaFetchWaitMaxMs = 100
     override val replicaLagMaxMessages = 10L
   })
-  val topic = "foo"
-  val partitionId = 0
 
-  /**
-   * The scenario for this test is that there is one topic, "test-topic", one broker "0" that has
-   * one  partition with one follower replica on broker "1".  The leader replica on "0"
-   * has HW of "5" and LEO of "20".  The follower on broker "1" has a local replica
-   * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync
-   * but is still in ISR (hasn't yet expired from ISR).
-   *
-   * When a normal consumer fetches data, it should only see data up to the HW of the leader,
-   * in this case up an offset of "5".
-   */
-  def testNonReplicaSeesHwWhenFetching() {
-    /* setup */
-    val time = new MockTime
-    val leo = 20L
-    val hw = 5
-    val fetchSize = 100
-    val messages = new Message("test-message".getBytes())
+  // set the replica manager with the partition
+  val time = new MockTime
+  val leaderLEO = 20L
+  val followerLEO = 15L
+  val partitionHW = 5
 
-    // create nice mock since we don't particularly care about zkclient calls
-    val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
-    EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false)
-    EasyMock.replay(zkClient)
+  val fetchSize = 100
+  val messagesToHW = new Message("messageToHW".getBytes())
+  val messagesToLEO = new Message("messageToLEO".getBytes())
 
-    val log = EasyMock.createMock(classOf[kafka.log.Log])
-    EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
-    EasyMock.expect(log)
-    EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(
-      new FetchDataInfo(
-        new LogOffsetMetadata(0L, 0L, leo.toInt),
-        new ByteBufferMessageSet(messages)
-      )).anyTimes()
-    EasyMock.replay(log)
+  val topic = "test-topic"
+  val partitionId = 0
+  val topicAndPartition = TopicAndPartition(topic, partitionId)
 
-    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
-    EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes()
-    EasyMock.replay(logManager)
+  val fetchInfo = Collections.singletonMap(topicAndPartition, PartitionFetchInfo(0, fetchSize)).toMap
 
-    val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
-    EasyMock.expect(replicaManager.config).andReturn(configs.head)
-    EasyMock.expect(replicaManager.logManager).andReturn(logManager)
-    EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
-    EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
-    EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
-      val fetchInfo = log.read(0, fetchSize, Some(hw))
-      val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
-      Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
-    }).anyTimes()
-    EasyMock.replay(replicaManager)
-
-    val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager)
-    partition.getReplica(configs(1).brokerId).get.logEndOffset = new LogOffsetMetadata(leo - 5L, 0L, leo.toInt - 5)
-
-    EasyMock.reset(replicaManager)
-    EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
-    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
-    EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject()))
-    EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
-      val fetchInfo = log.read(0, fetchSize, Some(hw))
-      val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
-      Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
-    }).anyTimes()
-
-    EasyMock.replay(replicaManager)
-
-    val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager])
-
-    val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
-
-    // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary)
-    // don't provide replica or leader callbacks since they will not be tested here
-    val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller)
-
-    val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
-    apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo)
-    EasyMock.replay(partitionStateInfo)
-    // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
-    val goodFetch = new FetchRequestBuilder()
-          .replicaId(Request.OrdinaryConsumerId)
-          .addFetch(topic, partitionId, 0, fetchSize)
-          .build()
-    val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch)
-
-    // send the request
-    apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeMs=1))
-
-    // make sure the log only reads bytes between 0->HW (5)
-    EasyMock.verify(log)
-  }
+  var replicaManager: ReplicaManager = null
 
-  /**
-   * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has
-   * one  partition with one follower replica on broker "1".  The leader replica on "0"
-   * has HW of "5" and LEO of "20".  The follower on broker "1" has a local replica
-   * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync
-   * but is still in ISR (hasn't yet expired from ISR).
-   *
-   * When the follower from broker "1" fetches data, it should see data upto the log end offset ("20")
-   */
-  def testReplicaSeesLeoWhenFetching() {
-    /* setup */
-    val time = new MockTime
-    val leo = 20
-    val hw = 5
-
-    val messages = new Message("test-message".getBytes())
-
-    val followerReplicaId = configs(1).brokerId
-    val followerLEO = 15
+  override def setUp() {
+    super.setUp()
 
+    // create nice mock since we don't particularly care about zkclient calls
     val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
-    EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false)
     EasyMock.replay(zkClient)
 
-    val log = EasyMock.createMock(classOf[kafka.log.Log])
-    EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
-    EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn(
+    // create nice mock since we don't particularly care about scheduler calls
+    val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
+    EasyMock.replay(scheduler)
+
+    // create the log which takes read with either HW max offset or none max offset
+    val log = EasyMock.createMock(classOf[Log])
+    EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
+    EasyMock.expect(log.read(0, fetchSize, Some(partitionHW))).andReturn(
       new FetchDataInfo(
-        new LogOffsetMetadata(followerLEO, 0L, followerLEO),
-        new ByteBufferMessageSet(messages)
+        new LogOffsetMetadata(0L, 0L, 0),
+        new ByteBufferMessageSet(messagesToHW)
+      )).anyTimes()
+    EasyMock.expect(log.read(0, fetchSize, None)).andReturn(
+      new FetchDataInfo(
+        new LogOffsetMetadata(0L, 0L, 0),
+        new ByteBufferMessageSet(messagesToLEO)
       )).anyTimes()
     EasyMock.replay(log)
 
+    // create the log manager that is aware of this mock log
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
-    EasyMock.expect(logManager.getLog(TopicAndPartition(topic, 0))).andReturn(Some(log)).anyTimes()
+    EasyMock.expect(logManager.getLog(topicAndPartition)).andReturn(Some(log)).anyTimes()
     EasyMock.replay(logManager)
 
-    val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
-    EasyMock.expect(replicaManager.config).andReturn(configs.head)
-    EasyMock.expect(replicaManager.logManager).andReturn(logManager)
-    EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
-    EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
-    EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
-      val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None)
-      val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
-      Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
-    }).anyTimes()
-    EasyMock.replay(replicaManager)
-
-    val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager)
-    partition.getReplica(followerReplicaId).get.logEndOffset = new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO)
-
-    EasyMock.reset(replicaManager)
-    EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
-    EasyMock.expect(replicaManager.updateReplicaLEOAndPartitionHW(topic, partitionId, followerReplicaId, new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO)))
-    EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId))
-    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
-    EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject()))
-    EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
-      val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None)
-      val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
-      Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
-    }).anyTimes()
-    EasyMock.expect(replicaManager.unblockDelayedProduceRequests(EasyMock.anyObject())).anyTimes()
-    EasyMock.replay(replicaManager)
-
-    val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager])
-
-    val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
-
-    val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller)
-    val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
-    apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo)
-    EasyMock.replay(partitionStateInfo)
-
-    /**
-     * This fetch, coming from a replica, requests all data at offset "15".  Because the request is coming
-     * from a follower, the leader should oblige and read beyond the HW.
-     */
-    val bigFetch = new FetchRequestBuilder()
-      .replicaId(followerReplicaId)
-      .addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE)
-      .build()
-
-    val fetchRequestBB = TestUtils.createRequestByteBuffer(bigFetch)
-
-    // send the request
-    apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeMs=1))
-
-    /**
-     * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after
-     * an offset of 15
-     */
-    EasyMock.verify(log)
-  }
+    // create the replica manager
+    replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, logManager, new AtomicBoolean(false))
+
+    // add the partition with two replicas, both in ISR
+    val partition = replicaManager.getOrCreatePartition(topic, partitionId)
 
-  private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
-                                               localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = {
-    val partition = new Partition(topic, partitionId, time, replicaManager)
-    val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
+    // create the leader replica with the local log
+    val leaderReplica = new Replica(configs(0).brokerId, partition, time, 0, Some(log))
+    leaderReplica.highWatermark = new LogOffsetMetadata(partitionHW)
+    partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
 
-    val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
+    // create the follower replica with defined log end offset
+    val followerReplica= new Replica(configs(1).brokerId, partition, time)
+    followerReplica.logEndOffset = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
+
+    // add both of them to ISR
+    val allReplicas = List(leaderReplica, followerReplica)
     allReplicas.foreach(partition.addReplicaIfNotExists(_))
-    // set in sync replicas for this partition to all the assigned replicas
     partition.inSyncReplicas = allReplicas.toSet
-    // set the leader and its hw and the hw update time
-    partition.leaderReplicaIdOpt = Some(leaderId)
-    leaderReplica.highWatermark = new LogOffsetMetadata(leaderHW)
-    partition
   }
 
-  private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
-    configs.filter(_.brokerId != leaderId).map { config =>
-      new Replica(config.brokerId, partition, time)
-    }
+  override def tearDown() {
+    replicaManager.shutdown(false)
+    super.tearDown()
   }
 
+  /**
+   * The scenario for this test is that there is one topic that has one partition
+   * with one leader replica on broker "0" and one follower replica on broker "1"
+   * inside the replica manager's metadata.
+   *
+   * The leader replica on "0" has HW of "5" and LEO of "20".  The follower on
+   * broker "1" has a local replica with a HW matching the leader's ("5") and
+   * LEO of "15", meaning it's not in-sync but is still in ISR (hasn't yet expired from ISR).
+   *
+   * When a fetch operation with read committed data turned on is received, the replica manager
+   * should only return data up to the HW of the partition; when a fetch operation with read
+   * committed data turned off is received, the replica manager could return data up to the LEO
+   * of the local leader replica's log.
+   */
+  def testReadFromLog() {
+
+    assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW,
+      replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
+
+    assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
+      replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
+  }
 }


Mime
View raw message