kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6431: Shard purgatory to mitigate lock contention (#5338)
Date Fri, 04 Jan 2019 00:09:01 GMT
This is an automated email from the ASF dual-hosted git repository.

sriharsha pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 459a4dd  KAFKA-6431: Shard purgatory to mitigate lock contention (#5338)
459a4dd is described below

commit 459a4dd032fe51f651a6d74170d64e3b041ba480
Author: ying-zheng <zheng.ying@rocketmail.com>
AuthorDate: Thu Jan 3 16:08:49 2019 -0800

    KAFKA-6431: Shard purgatory to mitigate lock contention (#5338)
    
    * Shard purgatory to reduce lock contention
    
    * put constant into Object, use foldLeft instead of for loop
    
    * watchersForKey -> watchersByKey
    
    * Incorporate Jun's comments: use named arguments instead of _, and remove
    an unnecessary lock
    
    Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>, Jun Rao <junrao@gmail.com>,
Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../main/scala/kafka/server/DelayedOperation.scala | 59 ++++++++++++++--------
 1 file changed, 39 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 2a096e1..eb20e6d 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -19,11 +19,11 @@ package kafka.server
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import java.util.concurrent.locks.{Lock, ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.locks.{Lock, ReentrantLock}
 
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import kafka.utils.CoreUtils.inLock
 import kafka.utils._
 import kafka.utils.timer._
 
@@ -147,6 +147,8 @@ abstract class DelayedOperation(override val delayMs: Long,
 
 object DelayedOperationPurgatory {
 
+  private val Shards = 512 // Shard the watcher list to reduce lock contention
+
   def apply[T <: DelayedOperation](purgatoryName: String,
                                    brokerId: Int = 0,
                                    purgeInterval: Int = 1000,
@@ -168,11 +170,25 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName:
Stri
                                                              reaperEnabled: Boolean = true,
                                                              timerEnabled: Boolean = true)
         extends Logging with KafkaMetricsGroup {
-
   /* a list of operation watching keys */
-  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
+  private class WatcherList {
+    val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
+
+    val watchersLock = new ReentrantLock()
 
-  private val removeWatchersLock = new ReentrantReadWriteLock()
+    /*
+     * Return all the current watcher lists,
+     * note that the returned watchers may be removed from the list by other threads
+     */
+    def allWatchers = {
+      watchersByKey.values
+    }
+  }
+
+  private val watcherLists = Array.fill[WatcherList](DelayedOperationPurgatory.Shards)(new
WatcherList)
+  private def watcherList(key: Any): WatcherList = {
+    watcherLists(Math.abs(key.hashCode() % watcherLists.length))
+  }
 
   // the number of estimated total operations in the purgatory
   private[this] val estimatedTotalOperations = new AtomicInteger(0)
@@ -270,7 +286,8 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName:
Stri
    * @return the number of completed operations during this process
    */
   def checkAndComplete(key: Any): Int = {
-    val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
+    val wl = watcherList(key)
+    val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) }
     if(watchers == null)
       0
     else
@@ -282,7 +299,9 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName:
Stri
    * on multiple lists, and some of its watched entries may still be in the watch lists
    * even when it has been completed, this number may be larger than the number of real operations
watched
    */
-  def watched: Int = allWatchers.map(_.countWatched).sum
+  def watched: Int = {
+    watcherLists.foldLeft(0) { case (sum, watcherList) => sum + watcherList.allWatchers.map(_.countWatched).sum
}
+  }
 
   /**
    * Return the number of delayed operations in the expiry queue
@@ -293,27 +312,24 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName:
Stri
     * Cancel watching on any delayed operations for the given key. Note the operation will
not be completed
     */
   def cancelForKey(key: Any): List[T] = {
-    inWriteLock(removeWatchersLock) {
-      val watchers = watchersForKey.remove(key)
+    val wl = watcherList(key)
+    inLock(wl.watchersLock) {
+      val watchers = wl.watchersByKey.remove(key)
       if (watchers != null)
         watchers.cancel()
       else
         Nil
     }
   }
-  /*
-   * Return all the current watcher lists,
-   * note that the returned watchers may be removed from the list by other threads
-   */
-  private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values }
 
   /*
    * Return the watch list of the given key, note that we need to
    * grab the removeWatchersLock to avoid the operation being added to a removed watcher
list
    */
   private def watchForOperation(key: Any, operation: T) {
-    inReadLock(removeWatchersLock) {
-      val watcher = watchersForKey.getAndMaybePut(key)
+    val wl = watcherList(key)
+    inLock(wl.watchersLock) {
+      val watcher = wl.watchersByKey.getAndMaybePut(key)
       watcher.watch(operation)
     }
   }
@@ -322,13 +338,14 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName:
Stri
    * Remove the key from watcher lists if its list is empty
    */
   private def removeKeyIfEmpty(key: Any, watchers: Watchers) {
-    inWriteLock(removeWatchersLock) {
+    val wl = watcherList(key)
+    inLock(wl.watchersLock) {
       // if the current key is no longer correlated to the watchers to remove, skip
-      if (watchersForKey.get(key) != watchers)
+      if (wl.watchersByKey.get(key) != watchers)
         return
 
       if (watchers != null && watchers.isEmpty) {
-        watchersForKey.remove(key)
+        wl.watchersByKey.remove(key)
       }
     }
   }
@@ -424,7 +441,9 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName:
Stri
       // a little overestimated total number of operations.
       estimatedTotalOperations.getAndSet(delayed)
       debug("Begin purging watch lists")
-      val purged = allWatchers.map(_.purgeCompleted()).sum
+      val purged = watcherLists.foldLeft(0) {
+        case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
+      }
       debug("Purged %d elements from watch lists.".format(purged))
     }
   }


Mime
View raw message