kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1952; High CPU Usage in 0.8.2 release; patched by Jun Rao; reviewed by Guozhang Wang, Ewen Cheslack-Postava and Neha Narkhede
Date Wed, 18 Feb 2015 21:15:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 7130da90a -> 375b15511


kafka-1952; High CPU Usage in 0.8.2 release; patched by Jun Rao; reviewed by Guozhang Wang,
Ewen Cheslack-Postava and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/375b1551
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/375b1551
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/375b1551

Branch: refs/heads/0.8.2
Commit: 375b15511a93313fce52442fb7d4eab4492da62e
Parents: 7130da9
Author: Jun Rao <junrao@gmail.com>
Authored: Wed Feb 18 13:15:15 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Feb 18 13:15:15 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/RequestPurgatory.scala   | 69 +++++++++++++-------
 1 file changed, 46 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/375b1551/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 9d76234..87ee3be 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -92,28 +92,55 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int
= 0, purgeInt
   expirationThread.start()
 
   /**
+   * Is this request satisfied by the caller thread?
+   */
+  private def isSatisfiedByMe(delayedRequest: T): Boolean = {
+    if(delayedRequest.satisfied.compareAndSet(false, true))
+      return true
+    else
+      return false
+  }
+
+  /**
    * Try to add the request for watch on all keys. Return true iff the request is
    * satisfied and the satisfaction is done by the caller.
-   *
-   * Requests can be watched on only a few of the keys if it is found satisfied when
-   * trying to add it to each one of the keys. In this case the request is still treated
as satisfied
-   * and hence no longer watched. Those already added elements will be later purged by the
expire reaper.
    */
   def checkAndMaybeWatch(delayedRequest: T): Boolean = {
+    if (delayedRequest.keys.size <= 0)
+      return isSatisfiedByMe(delayedRequest)
+
+    // The cost of checkSatisfied() is typically proportional to the number of keys. Calling
+    // checkSatisfied() for each key is going to be expensive if there are many keys. Instead,
+    // we do the check in the following way. Call checkSatisfied(). If the request is not
satisfied,
+    // we just add the request to all keys. Then we call checkSatisfied() again. At this
time, if
+    // the request is still not satisfied, we are guaranteed that it won't miss any future
triggering
+    // events since the request is already on the watcher list for all keys. This does mean
that
+    // if the request is satisfied (by another thread) between the two checkSatisfied() calls,
the
+    // request is unnecessarily added for watch. However, this is a less severe issue since
the
+    // expire reaper will clean it up periodically.
+
+    var isSatisfied = delayedRequest synchronized checkSatisfied(delayedRequest)
+    if (isSatisfied)
+      return isSatisfiedByMe(delayedRequest)
+
     for(key <- delayedRequest.keys) {
       val lst = watchersFor(key)
-      if(!lst.checkAndMaybeAdd(delayedRequest)) {
-        if(delayedRequest.satisfied.compareAndSet(false, true))
-          return true
-        else
-          return false
+      if (!lst.addIfNotSatisfied(delayedRequest)) {
+        // The request is already satisfied by another thread. No need to watch for the rest
of
+        // the keys.
+        return false
       }
     }
 
-    // if it is indeed watched, add to the expire queue also
-    expiredRequestReaper.enqueue(delayedRequest)
+    isSatisfied = delayedRequest synchronized checkSatisfied(delayedRequest)
+    if (isSatisfied)
+      return isSatisfiedByMe(delayedRequest)
+    else {
+      // If the request is still not satisfied, add to the expire queue also.
+      expiredRequestReaper.enqueue(delayedRequest)
 
-    false
+      return false
+    }
   }
 
   /**
@@ -171,20 +198,16 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int
= 0, purgeInt
     // return the size of the watch list
     def watched() = requests.size()
 
-    // potentially add the element to watch if it is not satisfied yet
-    def checkAndMaybeAdd(t: T): Boolean = {
+    // add the element to the watcher list if it's not already satisfied
+    def addIfNotSatisfied(t: T): Boolean = {
+      if (t.satisfied.get)
+        return false
+
       synchronized {
-        // if it is already satisfied, do not add to the watch list
-        if (t.satisfied.get)
-          return false
-        // synchronize on the delayed request to avoid any race condition
-        // with expire and update threads on client-side.
-        if(t synchronized checkSatisfied(t)) {
-          return false
-        }
         requests.add(t)
-        return true
       }
+
+      return true
     }
 
     // traverse the list and purge satisfied elements


Mime
View raw message