Repository: kafka Updated Branches: refs/heads/trunk ca6d01bc6 -> 9f80665ec KAFKA-2253; fix deadlock between removeWatchersLock and watcher operations list lock; reviewed by Onur Karaman and Jiangjie Qin Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f80665e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f80665e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f80665e Branch: refs/heads/trunk Commit: 9f80665ec6deff8525b61096034af8dc0cc9a03c Parents: ca6d01b Author: Guozhang Wang Authored: Wed Jun 10 11:28:53 2015 -0700 Committer: Guozhang Wang Committed: Wed Jun 10 11:28:53 2015 -0700 ---------------------------------------------------------------------- .../scala/kafka/server/DelayedOperation.scala | 45 +++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9f80665e/core/src/main/scala/kafka/server/DelayedOperation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 123078d..0b53532 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -189,8 +189,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // If the operation is already completed, stop adding it to the rest of the watcher list. if (operation.isCompleted()) return false - val watchers = watchersFor(key) - watchers.watch(operation) + watchForOperation(key, operation) if (!watchCreated) { watchCreated = true @@ -241,22 +240,34 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br def delayed() = timeoutTimer.size /* - * Return the watch list of the given key + * Return all the current watcher lists, + * note that the returned watchers may be removed from the list by other threads */ - private def watchersFor(key: Any) = inReadLock(removeWatchersLock) { watchersForKey.getAndMaybePut(key) } + private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values } /* - * Return all the current watcher lists + * Return the watch list of the given key, note that we need to + * grab the removeWatchersLock to avoid the operation being added to a removed watcher list */ - private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values } + private def watchForOperation(key: Any, operation: T) { + inReadLock(removeWatchersLock) { + val watcher = watchersForKey.getAndMaybePut(key) + watcher.watch(operation) + } + } /* * Remove the key from watcher lists if its list is empty */ - private def removeKeyIfEmpty(key: Any) = inWriteLock(removeWatchersLock) { - val watchers = watchersForKey.get(key) - if (watchers != null && watchers.watched == 0) { - watchersForKey.remove(key) + private def removeKeyIfEmpty(key: Any, watchers: Watchers) { + inWriteLock(removeWatchersLock) { + // if the current key is no longer correlated to the watchers to remove, skip + if (watchersForKey.get(key) != watchers) + return + + if (watchers != null && watchers.watched == 0) { + watchersForKey.remove(key) + } } } @@ -298,10 +309,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br iter.remove() } } - - if (operations.size == 0) - removeKeyIfEmpty(key) } + + if (operations.size == 0) + removeKeyIfEmpty(key, this) + completed } @@ -317,10 +329,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br purged += 1 } } - - if (operations.size == 0) - removeKeyIfEmpty(key) } + + if (operations.size == 0) + removeKeyIfEmpty(key, this) + purged } }