kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1720; Renamed Delayed Operations after KAFKA-1583; reviewed by Gwen Shapira and Joel Koshy
Date Thu, 04 Dec 2014 21:52:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7e9368baf -> 3cc10d5ff


KAFKA-1720; Renamed Delayed Operations after KAFKA-1583; reviewed by Gwen Shapira and Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cc10d5f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cc10d5f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cc10d5f

Branch: refs/heads/trunk
Commit: 3cc10d5ff9bf73cff2045685f89d71fee92a41f4
Parents: 7e9368b
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Thu Dec 4 13:51:44 2014 -0800
Committer: Guozhang Wang <guwang@linkedin.com>
Committed: Thu Dec 4 13:51:44 2014 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  14 +-
 .../main/scala/kafka/server/DelayedFetch.scala  |  10 +-
 .../scala/kafka/server/DelayedOperation.scala   | 316 ++++++++++++++++++
 .../kafka/server/DelayedOperationKey.scala      |  38 +++
 .../scala/kafka/server/DelayedProduce.scala     |  14 +-
 .../scala/kafka/server/ReplicaManager.scala     |  42 +--
 .../scala/kafka/server/RequestPurgatory.scala   | 317 -------------------
 .../kafka/server/DelayedOperationTest.scala     | 124 ++++++++
 .../kafka/server/RequestPurgatoryTest.scala     | 124 --------
 system_test/metrics.json                        |   4 +-
 10 files changed, 520 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3cc10d5f/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index b9fde2a..b230e9a 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -17,18 +17,18 @@
 package kafka.cluster
 
 import kafka.common._
-import kafka.admin.AdminUtils
 import kafka.utils._
+import kafka.utils.Utils.{inReadLock,inWriteLock}
+import kafka.admin.AdminUtils
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
-import kafka.server.{LogOffsetMetadata, OffsetManager, ReplicaManager}
+import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, ReplicaManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
 import kafka.message.ByteBufferMessageSet
 
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.utils.Utils.{inReadLock,inWriteLock}
 import scala.collection.immutable.Set
 
 import com.yammer.metrics.core.Gauge
@@ -232,7 +232,7 @@ class Partition(val topic: String,
   /**
    * Update the log end offset of a certain replica of this partition
    */
-  def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) = {
+  def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) {
     getReplica(replicaId) match {
       case Some(replica) =>
         replica.logEndOffset = offset
@@ -343,8 +343,8 @@ class Partition(val topic: String,
     if(oldHighWatermark.precedes(newHighWatermark)) {
       leaderReplica.highWatermark = newHighWatermark
       debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
-      // some delayed requests may be unblocked after HW changed
-      val requestKey = new TopicAndPartition(this.topic, this.partitionId)
+      // some delayed operations may be unblocked after HW changed
+      val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId)
       replicaManager.tryCompleteDelayedFetch(requestKey)
       replicaManager.tryCompleteDelayedProduce(requestKey)
     } else {
@@ -414,7 +414,7 @@ class Partition(val topic: String,
 
           val info = log.append(messages, assignOffsets = true)
           // probably unblock some follower fetch requests since log end offset has been updated
-          replicaManager.tryCompleteDelayedFetch(new TopicAndPartition(this.topic, this.partitionId))
+          replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1
           maybeIncrementLeaderHW(leaderReplica)
           info

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cc10d5f/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 1e2e56f..dd602ee 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -32,7 +32,7 @@ case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInf
 }
 
 /**
- * The fetch metadata maintained by the delayed produce request
+ * The fetch metadata maintained by the delayed fetch operation
  */
 case class FetchMetadata(fetchMinBytes: Int,
                          fetchOnlyLeader: Boolean,
@@ -45,17 +45,17 @@ case class FetchMetadata(fetchMinBytes: Int,
                           "partitionStatus: " + fetchPartitionStatus + "]"
 }
 /**
- * A delayed fetch request that can be created by the replica manager and watched
- * in the fetch request purgatory
+ * A delayed fetch operation that can be created by the replica manager and watched
+ * in the fetch operation purgatory
  */
 class DelayedFetch(delayMs: Long,
                    fetchMetadata: FetchMetadata,
                    replicaManager: ReplicaManager,
                    responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit)
-  extends DelayedRequest(delayMs) {
+  extends DelayedOperation(delayMs) {
 
   /**
-   * The request can be completed if:
+   * The operation can be completed if:
    *
    * Case A: This broker is no longer the leader for some partitions it tries to fetch
    * Case B: This broker does not know of some partitions it tries to fetch

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cc10d5f/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
new file mode 100644
index 0000000..fc06b01
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils._
+import kafka.metrics.KafkaMetricsGroup
+
+import java.util
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import scala.collection._
+
+import com.yammer.metrics.core.Gauge
+
+
+/**
+ * An operation whose processing needs to be delayed for at most the given delayMs. For example
+ * a delayed produce operation could be waiting for specified number of acks; or
+ * a delayed fetch operation could be waiting for a given number of bytes to accumulate.
+ *
+ * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once.
+ * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either
+ * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed,
+ * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls
+ * forceComplete().
+ *
+ * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
+ */
+abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) {
+  private val completed = new AtomicBoolean(false)
+
+  /*
+   * Force completing the delayed operation, if not already completed.
+   * This function can be triggered when
+   *
+   * 1. The operation has been verified to be completable inside tryComplete()
+   * 2. The operation has expired and hence needs to be completed right now
+   *
+   * Return true iff the operation is completed by the caller: note that
+   * concurrent threads can try to complete the same operation, but only
+   * the first thread will succeed in completing the operation and return
+   * true, others will still return false
+   */
+  def forceComplete(): Boolean = {
+    if (completed.compareAndSet(false, true)) {
+      onComplete()
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Check if the delayed operation is already completed
+   */
+  def isCompleted(): Boolean = completed.get()
+
+  /**
+   * Process for completing an operation; This function needs to be defined
+   * in subclasses and will be called exactly once in forceComplete()
+   */
+  def onComplete(): Unit
+
+  /*
+   * Try to complete the delayed operation by first checking if the operation
+   * can be completed by now. If yes execute the completion logic by calling
+   * forceComplete() and return true iff forceComplete returns true; otherwise return false
+   *
+   * This function needs to be defined in subclasses
+   */
+  def tryComplete(): Boolean
+}
+
+/**
+ * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
+ */
+class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeInterval: Int = 1000)
+        extends Logging with KafkaMetricsGroup {
+
+  /* a list of operation watching keys */
+  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
+
+  /* background thread expiring operations that have timed out */
+  private val expirationReaper = new ExpiredOperationReaper
+
+  newGauge(
+    "PurgatorySize",
+    new Gauge[Int] {
+      def value = watched()
+    }
+  )
+
+  newGauge(
+    "NumDelayedOperations",
+    new Gauge[Int] {
+      def value = delayed()
+    }
+  )
+
+  expirationReaper.start()
+
+  /**
+   * Check if the operation can be completed, if not watch it based on the given watch keys
+   *
+   * Note that a delayed operation can be watched on multiple keys. It is possible that
+   * an operation is completed after it has been added to the watch list for some, but
+   * not all of the keys. In this case, the operation is considered completed and won't
+   * be added to the watch list of the remaining keys. The expiration reaper thread will
+   * remove this operation from any watcher list in which the operation exists.
+   *
+   * @param operation the delayed operation to be checked
+   * @param watchKeys keys for bookkeeping the operation
+   * @return true iff the delayed operations can be completed by the caller
+   */
+  def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
+    for(key <- watchKeys) {
+      // if the operation is already completed, stopping adding it to
+      // any further lists and return false
+      if (operation.isCompleted())
+        return false
+      val watchers = watchersFor(key)
+      // if the operation can by completed by myself, stop adding it to
+      // any further lists and return true immediately
+      if(operation synchronized operation.tryComplete()) {
+        return true
+      } else {
+        watchers.watch(operation)
+      }
+    }
+
+    // if it cannot be completed by now and hence is watched, add to the expire queue also
+    if (! operation.isCompleted()) {
+      expirationReaper.enqueue(operation)
+    }
+
+    false
+  }
+
+  /**
+   * Check if some some delayed operations can be completed with the given watch key,
+   * and if yes complete them.
+   *
+   * @return the number of completed operations during this process
+   */
+  def checkAndComplete(key: Any): Int = {
+    val watchers = watchersForKey.get(key)
+    if(watchers == null)
+      0
+    else
+      watchers.tryCompleteWatched()
+  }
+
+  /**
+   * Return the total size of watch lists the purgatory. Since an operation may be watched
+   * on multiple lists, and some of its watched entries may still be in the watch lists
+   * even when it has been completed, this number may be larger than the number of real operations watched
+   */
+  def watched() = watchersForKey.values.map(_.watched).sum
+
+  /**
+   * Return the number of delayed operations in the expiry queue
+   */
+  def delayed() = expirationReaper.delayed
+
+  /*
+   * Return the watch list of the given key
+   */
+  private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key)
+
+  /**
+   * Shutdown the expire reaper thread
+   */
+  def shutdown() {
+    expirationReaper.shutdown()
+  }
+
+  /**
+   * A linked list of watched delayed operations based on some key
+   */
+  private class Watchers {
+    private val operations = new util.LinkedList[T]
+
+    def watched = operations.size()
+
+    // add the element to watch
+    def watch(t: T) {
+      synchronized {
+        operations.add(t)
+      }
+    }
+
+    // traverse the list and try to complete some watched elements
+    def tryCompleteWatched(): Int = {
+      var completed = 0
+      synchronized {
+        val iter = operations.iterator()
+        while(iter.hasNext) {
+          val curr = iter.next
+          if (curr.isCompleted()) {
+            // another thread has completed this operation, just remove it
+            iter.remove()
+          } else {
+            if(curr synchronized curr.tryComplete()) {
+              iter.remove()
+              completed += 1
+            }
+          }
+        }
+      }
+      completed
+    }
+
+    // traverse the list and purge elements that are already completed by others
+    def purgeCompleted(): Int = {
+      var purged = 0
+      synchronized {
+        val iter = operations.iterator()
+        while (iter.hasNext) {
+          val curr = iter.next
+          if(curr.isCompleted()) {
+            iter.remove()
+            purged += 1
+          }
+        }
+      }
+      purged
+    }
+  }
+
+  /**
+   * A background reaper to expire delayed operations that have timed out
+   */
+  private class ExpiredOperationReaper extends ShutdownableThread(
+    "ExpirationReaper-%d".format(brokerId),
+    false) {
+
+    /* The queue storing all delayed operations */
+    private val delayedQueue = new DelayQueue[T]
+
+    /*
+     * Return the number of delayed operations kept by the reaper
+     */
+    def delayed() = delayedQueue.size()
+
+    /*
+     * Add an operation to be expired
+     */
+    def enqueue(t: T) {
+      delayedQueue.add(t)
+    }
+
+    /**
+     * Try to get the next expired event and force completing it
+     */
+    private def expireNext() {
+      val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)
+      if (curr != null.asInstanceOf[T]) {
+        // if there is an expired operation, try to force complete it
+        if (curr synchronized curr.forceComplete()) {
+          debug("Force complete expired delayed operation %s".format(curr))
+        }
+      }
+    }
+
+    /**
+     * Delete all satisfied events from the delay queue and the watcher lists
+     */
+    private def purgeCompleted(): Int = {
+      var purged = 0
+
+      // purge the delayed queue
+      val iter = delayedQueue.iterator()
+      while (iter.hasNext) {
+        val curr = iter.next()
+        if (curr.isCompleted()) {
+          iter.remove()
+          purged += 1
+        }
+      }
+
+      purged
+    }
+
+    override def doWork() {
+      // try to get the next expired operation and force completing it
+      expireNext()
+      // see if we need to purge the watch lists
+      if (DelayedOperationPurgatory.this.watched() >= purgeInterval) {
+        debug("Begin purging watch lists")
+        val purged = watchersForKey.values.map(_.purgeCompleted()).sum
+        debug("Purged %d elements from watch lists.".format(purged))
+      }
+      // see if we need to purge the delayed operation queue
+      if (delayed() >= purgeInterval) {
+        debug("Begin purging delayed queue")
+        val purged = purgeCompleted()
+        debug("Purged %d operations from delayed queue.".format(purged))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cc10d5f/core/src/main/scala/kafka/server/DelayedOperationKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
new file mode 100644
index 0000000..fb7e9ed
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.TopicAndPartition
+
+/**
+ * Keys used for delayed operation metrics recording
+ */
+trait DelayedOperationKey {
+  def keyLabel: String
+}
+
+object DelayedOperationKey {
+  val globalLabel = "All"
+}
+
+case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey {
+
+  def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
+
+  override def keyLabel = "%s-%d".format(topic, partition)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cc10d5f/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 1603066..c229088 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -32,7 +32,7 @@ case class ProducePartitionStatus(requiredOffset: Long, responseStatus: Producer
 }
 
 /**
- * The produce metadata maintained by the delayed produce request
+ * The produce metadata maintained by the delayed produce operation
  */
 case class ProduceMetadata(produceRequiredAcks: Short,
                            produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) {
@@ -42,14 +42,14 @@ case class ProduceMetadata(produceRequiredAcks: Short,
 }
 
 /**
- * A delayed produce request that can be created by the replica manager and watched
- * in the produce request purgatory
+ * A delayed produce operation that can be created by the replica manager and watched
+ * in the produce operation purgatory
  */
 class DelayedProduce(delayMs: Long,
                      produceMetadata: ProduceMetadata,
                      replicaManager: ReplicaManager,
                      responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
-  extends DelayedRequest(delayMs) {
+  extends DelayedOperation(delayMs) {
 
   // first update the acks pending variable according to the error code
   produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
@@ -65,13 +65,13 @@ class DelayedProduce(delayMs: Long,
   }
 
   /**
-   * The delayed produce request can be completed if every partition
+   * The delayed produce operation can be completed if every partition
    * it produces to is satisfied by one of the following:
    *
    * Case A: This broker is no longer the leader: set an error in response
    * Case B: This broker is the leader:
    *   B.1 - If there was a local error thrown while checking if at least requiredAcks
-   *         replicas have caught up to this request: set an error in response
+   *         replicas have caught up to this operation: set an error in response
    *   B.2 - Otherwise, set the response with no error.
    */
   override def tryComplete(): Boolean = {
@@ -117,4 +117,4 @@ class DelayedProduce(delayMs: Long,
     val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
     responseCallback(responseStatus)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cc10d5f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b3566b0..e58fbb9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -80,8 +80,8 @@ class ReplicaManager(val config: KafkaConfig,
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
   val stateChangeLogger = KafkaController.stateChangeLogger
 
-  val producerRequestPurgatory = new RequestPurgatory[DelayedProduce](config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
-  val fetchRequestPurgatory = new RequestPurgatory[DelayedFetch](config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
+  val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
+  val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
 
 
   newGauge(
@@ -123,9 +123,9 @@ class ReplicaManager(val config: KafkaConfig,
    * 1. The partition HW has changed (for acks = -1)
    * 2. A follower replica's fetch operation is received (for acks > 1)
    */
-  def tryCompleteDelayedProduce(key: TopicAndPartition) {
-    val completed = producerRequestPurgatory.checkAndComplete(key)
-    debug("Request key %s unblocked %d producer requests.".format(key, completed))
+  def tryCompleteDelayedProduce(key: DelayedOperationKey) {
+    val completed = delayedProducePurgatory.checkAndComplete(key)
+    debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed))
   }
 
   /**
@@ -135,9 +135,9 @@ class ReplicaManager(val config: KafkaConfig,
    * 1. The partition HW has changed (for regular fetch)
    * 2. A new message set is appended to the local log (for follower fetch)
    */
-  def tryCompleteDelayedFetch(key: TopicAndPartition) {
-    val completed = fetchRequestPurgatory.checkAndComplete(key)
-    debug("Request key %s unblocked %d fetch requests.".format(key, completed))
+  def tryCompleteDelayedFetch(key: DelayedOperationKey) {
+    val completed = delayedFetchPurgatory.checkAndComplete(key)
+    debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
   }
 
   def startup() {
@@ -280,13 +280,13 @@ class ReplicaManager(val config: KafkaConfig,
       val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
       val delayedProduce =  new DelayedProduce(timeout, produceMetadata, this, responseCallback)
 
-      // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val producerRequestKeys = messagesPerPartition.keys.toSeq
+      // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
+      val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
 
       // try to complete the request immediately, otherwise put it into the purgatory
-      // this is because while the delayed request is being created, new requests may
-      // arrive which can make this request completable.
-      producerRequestPurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
+      // this is because while the delayed produce operation is being created, new
+      // requests may arrive and hence make this operation completable.
+      delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
     }
   }
 
@@ -392,13 +392,13 @@ class ReplicaManager(val config: KafkaConfig,
       val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, fetchPartitionStatus)
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
 
-      // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val delayedFetchKeys = fetchPartitionStatus.keys.toSeq
+      // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
+      val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionOperationKey(_)).toSeq
 
       // try to complete the request immediately, otherwise put it into the purgatory;
-      // this is because while the delayed request is being created, new requests may
-      // arrive which can make this request completable.
-      fetchRequestPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
+      // this is because while the delayed fetch operation is being created, new requests
+      // may arrive and hence make this operation completable.
+      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
     }
   }
 
@@ -718,7 +718,7 @@ class ReplicaManager(val config: KafkaConfig,
 
           // for producer requests with ack > 1, we need to check
           // if they can be unblocked after some follower's log end offsets have moved
-          tryCompleteDelayedProduce(topicAndPartition)
+          tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicAndPartition))
         case None =>
           warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicAndPartition))
       }
@@ -749,8 +749,8 @@ class ReplicaManager(val config: KafkaConfig,
   def shutdown(checkpointHW: Boolean = true) {
     info("Shutting down")
     replicaFetcherManager.shutdown()
-    fetchRequestPurgatory.shutdown()
-    producerRequestPurgatory.shutdown()
+    delayedFetchPurgatory.shutdown()
+    delayedProducePurgatory.shutdown()
     if (checkpointHW)
       checkpointHighWatermarks()
     info("Shut down completely")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cc10d5f/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
deleted file mode 100644
index 323b12e..0000000
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ /dev/null
@@ -1,317 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils._
-import kafka.metrics.KafkaMetricsGroup
-
-import java.util
-import java.util.concurrent._
-import java.util.concurrent.atomic._
-import scala.collection._
-
-import com.yammer.metrics.core.Gauge
-
-
-/**
- * An operation whose processing needs to be delayed for at most the given delayMs. For example
- * a delayed produce operation could be waiting for specified number of acks; or
- * a delayed fetch operation could be waiting for a given number of bytes to accumulate.
- *
- * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once.
- * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either
- * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed,
- * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls
- * forceComplete().
- *
- * A subclass of DelayedRequest needs to provide an implementation of both onComplete() and tryComplete().
- */
-abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) {
-  private val completed = new AtomicBoolean(false)
-
-  /*
-   * Force completing the delayed operation, if not already completed.
-   * This function can be triggered when
-   *
-   * 1. The operation has been verified to be completable inside tryComplete()
-   * 2. The operation has expired and hence needs to be completed right now
-   *
-   * Return true iff the operation is completed by the caller
-   */
-  def forceComplete(): Boolean = {
-    if (completed.compareAndSet(false, true)) {
-      onComplete()
-      true
-    } else {
-      false
-    }
-  }
-
-  /**
-   * Check if the delayed operation is already completed
-   */
-  def isCompleted(): Boolean = completed.get()
-
-  /**
-   * Process for completing an operation; This function needs to be defined in subclasses
-   * and will be called exactly once in forceComplete()
-   */
-  def onComplete(): Unit
-
-  /*
-   * Try to complete the delayed operation by first checking if the operation
-   * can be completed by now. If yes execute the completion logic by calling
-   * forceComplete() and return true iff forceComplete returns true; otherwise return false
-   *
-   * Note that concurrent threads can check if an operation can be completed or not,
-   * but only the first thread will succeed in completing the operation and return
-   * true, others will still return false
-   *
-   * this function needs to be defined in subclasses
-   */
-  def tryComplete(): Boolean
-}
-
-/**
- * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
- */
-class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000)
-        extends Logging with KafkaMetricsGroup {
-
-  /* a list of requests watching each key */
-  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
-
-  /* background thread expiring requests that have been waiting too long */
-  private val expirationReaper = new ExpiredOperationReaper
-
-  newGauge(
-    "PurgatorySize",
-    new Gauge[Int] {
-      def value = watched()
-    }
-  )
-
-  newGauge(
-    "NumDelayedRequests",
-    new Gauge[Int] {
-      def value = delayed()
-    }
-  )
-
-  expirationReaper.start()
-
-  /**
-   * Check if the operation can be completed, if not watch it based on the given watch keys
-   *
-   * Note that a delayed operation can be watched on multiple keys. It is possible that
-   * an operation is completed after it has been added to the watch list for some, but
-   * not all of the keys. In this case, the operation is considered completed and won't
-   * be added to the watch list of the remaining keys. The expiration reaper thread will
-   * remove this operation from any watcher list in which the operation exists.
-   *
-   * @param operation the delayed operation to be checked
-   * @param watchKeys keys for bookkeeping the operation
-   * @return true iff the delayed operations can be completed by the caller
-   */
-  def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
-    for(key <- watchKeys) {
-      // if the operation is already completed, stopping adding it to
-      // any further lists and return false
-      if (operation.isCompleted())
-        return false
-      val watchers = watchersFor(key)
-      // if the operation can by completed by myself, stop adding it to
-      // any further lists and return true immediately
-      if(operation synchronized operation.tryComplete()) {
-        return true
-      } else {
-        watchers.watch(operation)
-      }
-    }
-
-    // if it cannot be completed by now and hence is watched, add to the expire queue also
-    if (! operation.isCompleted()) {
-      expirationReaper.enqueue(operation)
-    }
-
-    false
-  }
-
-  /**
-   * Check if some some delayed requests can be completed with the given watch key,
-   * and if yes complete them.
-   *
-   * @return the number of completed requests during this process
-   */
-  def checkAndComplete(key: Any): Int = {
-    val watchers = watchersForKey.get(key)
-    if(watchers == null)
-      0
-    else
-      watchers.tryCompleteWatched()
-  }
-
-  /**
-   * Return the total size of watch lists the purgatory. Since an operation may be watched
-   * on multiple lists, and some of its watched entries may still be in the watch lists
-   * even when it has been completed, this number may be larger than the number of real operations watched
-   */
-  def watched() = watchersForKey.values.map(_.watched).sum
-
-  /**
-   * Return the number of delayed operations in the expiry queue
-   */
-  def delayed() = expirationReaper.delayed
-
-  /*
-   * Return the watch list of the given key
-   */
-  private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key)
-
-  /**
-   * Shutdown the expire reaper thread
-   */
-  def shutdown() {
-    expirationReaper.shutdown()
-  }
-
-  /**
-   * A linked list of watched delayed operations based on some key
-   */
-  private class Watchers {
-    private val requests = new util.LinkedList[T]
-
-    def watched = requests.size()
-
-    // add the element to watch
-    def watch(t: T) {
-      synchronized {
-        requests.add(t)
-      }
-    }
-
-    // traverse the list and try to complete some watched elements
-    def tryCompleteWatched(): Int = {
-      var completed = 0
-      synchronized {
-        val iter = requests.iterator()
-        while(iter.hasNext) {
-          val curr = iter.next
-          if (curr.isCompleted()) {
-            // another thread has completed this request, just remove it
-            iter.remove()
-          } else {
-            if(curr synchronized curr.tryComplete()) {
-              iter.remove()
-              completed += 1
-            }
-          }
-        }
-      }
-      completed
-    }
-
-    // traverse the list and purge elements that are already completed by others
-    def purgeCompleted(): Int = {
-      var purged = 0
-      synchronized {
-        val iter = requests.iterator()
-        while (iter.hasNext) {
-          val curr = iter.next
-          if(curr.isCompleted()) {
-            iter.remove()
-            purged += 1
-          }
-        }
-      }
-      purged
-    }
-  }
-
-  /**
-   * A background reaper to expire delayed operations that have timed out
-   */
-  private class ExpiredOperationReaper extends ShutdownableThread(
-    "ExpirationReaper-%d".format(brokerId),
-    false) {
-
-    /* The queue storing all delayed operations */
-    private val delayedQueue = new DelayQueue[T]
-
-    /*
-     * Return the number of delayed operations kept by the reaper
-     */
-    def delayed() = delayedQueue.size()
-
-    /*
-     * Add an operation to be expired
-     */
-    def enqueue(t: T) {
-      delayedQueue.add(t)
-    }
-
-    /**
-     * Try to get the next expired event and force completing it
-     */
-    private def expireNext() {
-      val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)
-      if (curr != null.asInstanceOf[T]) {
-        // if there is an expired operation, try to force complete it
-        if (curr synchronized curr.forceComplete()) {
-          debug("Force complete expired delayed operation %s".format(curr))
-        }
-      }
-    }
-
-    /**
-     * Delete all satisfied events from the delay queue and the watcher lists
-     */
-    private def purgeCompleted(): Int = {
-      var purged = 0
-
-      // purge the delayed queue
-      val iter = delayedQueue.iterator()
-      while (iter.hasNext) {
-        val curr = iter.next()
-        if (curr.isCompleted()) {
-          iter.remove()
-          purged += 1
-        }
-      }
-
-      purged
-    }
-
-    override def doWork() {
-      // try to get the next expired operation and force completing it
-      expireNext()
-      // see if we need to purge the watch lists
-      if (RequestPurgatory.this.watched() >= purgeInterval) {
-        debug("Begin purging watch lists")
-        val purged = watchersForKey.values.map(_.purgeCompleted()).sum
-        debug("Purged %d elements from watch lists.".format(purged))
-      }
-      // see if we need to purge the delayed request queue
-      if (delayed() >= purgeInterval) {
-        debug("Begin purging delayed queue")
-        val purged = purgeCompleted()
-        debug("Purged %d operations from delayed queue.".format(purged))
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cc10d5f/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
new file mode 100644
index 0000000..93f52d3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
+import kafka.utils.TestUtils
+
+class DelayedOperationTest extends JUnit3Suite {
+
+  var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
+  
+  override def setUp() {
+    super.setUp()
+    purgatory = new DelayedOperationPurgatory[MockDelayedOperation](0, 5)
+  }
+  
+  override def tearDown() {
+    purgatory.shutdown()
+    super.tearDown()
+  }
+
+  @Test
+  def testRequestSatisfaction() {
+    val r1 = new MockDelayedOperation(100000L)
+    val r2 = new MockDelayedOperation(100000L)
+    assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.checkAndComplete("test1"))
+    assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1")))
+    assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test1"))
+    assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2")))
+    assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test2"))
+    r1.completable = true
+    assertEquals("r1 satisfied", 1, purgatory.checkAndComplete("test1"))
+    assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test1"))
+    r2.completable = true
+    assertEquals("r2 satisfied", 1, purgatory.checkAndComplete("test2"))
+    assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test2"))
+  }
+
+  @Test
+  def testRequestExpiry() {
+    val expiration = 20L
+    val r1 = new MockDelayedOperation(expiration)
+    val r2 = new MockDelayedOperation(200000L)
+    val start = System.currentTimeMillis
+    assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1")))
+    assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2")))
+    r1.awaitExpiration()
+    val elapsed = System.currentTimeMillis - start
+    assertTrue("r1 completed due to expiration", r1.isCompleted())
+    assertFalse("r2 hasn't completed", r2.isCompleted())
+    assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration)
+  }
+
+  @Test
+  def testRequestPurge() {
+    val r1 = new MockDelayedOperation(100000L)
+    val r2 = new MockDelayedOperation(100000L)
+    purgatory.tryCompleteElseWatch(r1, Array("test1"))
+    purgatory.tryCompleteElseWatch(r2, Array("test1", "test2"))
+    purgatory.tryCompleteElseWatch(r1, Array("test2", "test3"))
+
+    assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched())
+    assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed())
+
+    // complete one of the operations, it should
+    // eventually be purged from the watch list with purge interval 5
+    r2.completable = true
+    r2.tryComplete()
+    TestUtils.waitUntilTrue(() => purgatory.watched() == 3,
+      "Purgatory should have 3 watched elements instead of " + purgatory.watched(), 1000L)
+    TestUtils.waitUntilTrue(() => purgatory.delayed() == 3,
+      "Purgatory should still have 3 total delayed operations instead of " + purgatory.delayed(), 1000L)
+
+    // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5
+    purgatory.tryCompleteElseWatch(r1, Array("test1"))
+    purgatory.tryCompleteElseWatch(r1, Array("test1"))
+
+    TestUtils.waitUntilTrue(() => purgatory.watched() == 5,
+      "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L)
+    TestUtils.waitUntilTrue(() => purgatory.delayed() == 4,
+      "Purgatory should have 4 total delayed operations instead of " + purgatory.delayed(), 1000L)
+  }
+  
+  class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) {
+    var completable = false
+
+    def awaitExpiration() {
+      synchronized {
+        wait()
+      }
+    }
+
+    override def tryComplete() = {
+      if (completable)
+        forceComplete()
+      else
+        false
+    }
+
+    override def onComplete() {
+      synchronized {
+        notify()
+      }
+    }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cc10d5f/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
deleted file mode 100644
index a7720d5..0000000
--- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.junit.Test
-import org.scalatest.junit.JUnit3Suite
-import junit.framework.Assert._
-import kafka.utils.TestUtils
-
-class RequestPurgatoryTest extends JUnit3Suite {
-
-  var purgatory: RequestPurgatory[MockDelayedRequest] = null
-  
-  override def setUp() {
-    super.setUp()
-    purgatory = new RequestPurgatory[MockDelayedRequest](0, 5)
-  }
-  
-  override def tearDown() {
-    purgatory.shutdown()
-    super.tearDown()
-  }
-
-  @Test
-  def testRequestSatisfaction() {
-    val r1 = new MockDelayedRequest(100000L)
-    val r2 = new MockDelayedRequest(100000L)
-    assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.checkAndComplete("test1"))
-    assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1")))
-    assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test1"))
-    assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2")))
-    assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test2"))
-    r1.completable = true
-    assertEquals("r1 satisfied", 1, purgatory.checkAndComplete("test1"))
-    assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test1"))
-    r2.completable = true
-    assertEquals("r2 satisfied", 1, purgatory.checkAndComplete("test2"))
-    assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test2"))
-  }
-
-  @Test
-  def testRequestExpiry() {
-    val expiration = 20L
-    val r1 = new MockDelayedRequest(expiration)
-    val r2 = new MockDelayedRequest(200000L)
-    val start = System.currentTimeMillis
-    assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1")))
-    assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2")))
-    r1.awaitExpiration()
-    val elapsed = System.currentTimeMillis - start
-    assertTrue("r1 completed due to expiration", r1.isCompleted())
-    assertFalse("r2 hasn't completed", r2.isCompleted())
-    assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration)
-  }
-
-  @Test
-  def testRequestPurge() {
-    val r1 = new MockDelayedRequest(100000L)
-    val r2 = new MockDelayedRequest(100000L)
-    purgatory.tryCompleteElseWatch(r1, Array("test1"))
-    purgatory.tryCompleteElseWatch(r2, Array("test1", "test2"))
-    purgatory.tryCompleteElseWatch(r1, Array("test2", "test3"))
-
-    assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched())
-    assertEquals("Purgatory should have 3 total delayed requests", 3, purgatory.delayed())
-
-    // complete one of the operations, it should
-    // eventually be purged from the watch list with purge interval 5
-    r2.completable = true
-    r2.tryComplete()
-    TestUtils.waitUntilTrue(() => purgatory.watched() == 3,
-      "Purgatory should have 3 watched elements instead of " + purgatory.watched(), 1000L)
-    TestUtils.waitUntilTrue(() => purgatory.delayed() == 3,
-      "Purgatory should still have 3 total delayed requests instead of " + purgatory.delayed(), 1000L)
-
-    // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5
-    purgatory.tryCompleteElseWatch(r1, Array("test1"))
-    purgatory.tryCompleteElseWatch(r1, Array("test1"))
-
-    TestUtils.waitUntilTrue(() => purgatory.watched() == 5,
-      "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L)
-    TestUtils.waitUntilTrue(() => purgatory.delayed() == 4,
-      "Purgatory should have 4 total delayed requests instead of " + purgatory.delayed(), 1000L)
-  }
-  
-  class MockDelayedRequest(delayMs: Long) extends DelayedRequest(delayMs) {
-    var completable = false
-
-    def awaitExpiration() {
-      synchronized {
-        wait()
-      }
-    }
-
-    override def tryComplete() = {
-      if (completable)
-        forceComplete()
-      else
-        false
-    }
-
-    override def onComplete() {
-      synchronized {
-        notify()
-      }
-    }
-  }
-  
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cc10d5f/system_test/metrics.json
----------------------------------------------------------------------
diff --git a/system_test/metrics.json b/system_test/metrics.json
index cd3fc14..30dabe5 100644
--- a/system_test/metrics.json
+++ b/system_test/metrics.json
@@ -78,13 +78,13 @@
                {
                   "graph_name": "ProducePurgatoryQueueSize",
                   "y_label": "size",
-                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests",
+                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedOperations",
                   "attributes": "Value"
                },
                {
                   "graph_name": "FetchPurgatoryQueueSize",
                   "y_label": "size",
-                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests",
+                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedOperations",
                   "attributes": "Value"
                },
                {


Mime
View raw message