kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1952; High CPU Usage in 0.8.2 release; patched by Jun Rao; reviewed by Guozhang Wang, Ewen Cheslack-Postava and Neha Narkhede
Date Wed, 18 Feb 2015 21:39:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d5fbba633 -> f5684366e


kafka-1952; High CPU Usage in 0.8.2 release; patched by Jun Rao; reviewed by Guozhang Wang,
Ewen Cheslack-Postava and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f5684366
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f5684366
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f5684366

Branch: refs/heads/trunk
Commit: f5684366ef60125c4d799121a6c0adca4744e8ab
Parents: d5fbba6
Author: Jun Rao <junrao@gmail.com>
Authored: Wed Feb 18 13:39:05 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Feb 18 13:39:05 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/DelayedOperation.scala   | 34 +++++++++++++-------
 1 file changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f5684366/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index fc06b01..1d11099 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -128,25 +128,37 @@ class DelayedOperationPurgatory[T <: DelayedOperation](brokerId:
Int = 0, purgeI
    * @return true iff the delayed operations can be completed by the caller
    */
   def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
+    assert(watchKeys.size > 0, "The watch key list can't be empty")
+
+    // The cost of tryComplete() is typically proportional to the number of keys. Calling
+    // tryComplete() for each key is going to be expensive if there are many keys. Instead,
+    // we do the check in the following way. Call tryComplete(). If the operation is not
completed,
+    // we just add the operation to all keys. Then we call tryComplete() again. At this time,
if
+    // the operation is still not completed, we are guaranteed that it won't miss any future
triggering
+    // event since the operation is already on the watcher list for all keys. This does mean
that
+    // if the operation is completed (by another thread) between the two tryComplete() calls,
the
+    // operation is unnecessarily added for watch. However, this is a less severe issue since
the
+    // expire reaper will clean it up periodically.
+
+    var isCompletedByMe = operation synchronized operation.tryComplete()
+    if (isCompletedByMe)
+      return true
+
     for(key <- watchKeys) {
-      // if the operation is already completed, stopping adding it to
-      // any further lists and return false
+      // If the operation is already completed, stop adding it to the rest of the watcher
list.
       if (operation.isCompleted())
         return false
       val watchers = watchersFor(key)
-      // if the operation can by completed by myself, stop adding it to
-      // any further lists and return true immediately
-      if(operation synchronized operation.tryComplete()) {
-        return true
-      } else {
-        watchers.watch(operation)
-      }
+      watchers.watch(operation)
     }
 
+    isCompletedByMe = operation synchronized operation.tryComplete()
+    if (isCompletedByMe)
+      return true
+
     // if it cannot be completed by now and hence is watched, add to the expire queue also
-    if (! operation.isCompleted()) {
+    if (! operation.isCompleted())
       expirationReaper.enqueue(operation)
-    }
 
     false
   }


Mime
View raw message