From commits-return-2387-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Wed Jun 10 18:29:12 2015 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4126F184AD for ; Wed, 10 Jun 2015 18:29:12 +0000 (UTC) Received: (qmail 6633 invoked by uid 500); 10 Jun 2015 18:29:12 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 6607 invoked by uid 500); 10 Jun 2015 18:29:12 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 6598 invoked by uid 99); 10 Jun 2015 18:29:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 18:29:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D87A4E0385; Wed, 10 Jun 2015 18:29:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: <5b91aa79ebc9489f92bf95f39f394328@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-2253; fix deadlock between removeWatchersLock and watcher operations list lock; reviewed by Onur Karaman and Jiangjie Qin Date: Wed, 10 Jun 2015 18:29:11 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk ca6d01bc6 -> 9f80665ec KAFKA-2253; fix deadlock between removeWatchersLock and watcher operations list lock; reviewed by Onur Karaman and Jiangjie Qin Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f80665e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f80665e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f80665e Branch: refs/heads/trunk Commit: 9f80665ec6deff8525b61096034af8dc0cc9a03c Parents: ca6d01b Author: Guozhang Wang Authored: Wed Jun 10 11:28:53 2015 -0700 Committer: Guozhang Wang Committed: Wed Jun 10 11:28:53 2015 -0700 ---------------------------------------------------------------------- .../scala/kafka/server/DelayedOperation.scala | 45 +++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9f80665e/core/src/main/scala/kafka/server/DelayedOperation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 123078d..0b53532 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -189,8 +189,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // If the operation is already completed, stop adding it to the rest of the watcher list. if (operation.isCompleted()) return false - val watchers = watchersFor(key) - watchers.watch(operation) + watchForOperation(key, operation) if (!watchCreated) { watchCreated = true @@ -241,22 +240,34 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br def delayed() = timeoutTimer.size /* - * Return the watch list of the given key + * Return all the current watcher lists, + * note that the returned watchers may be removed from the list by other threads */ - private def watchersFor(key: Any) = inReadLock(removeWatchersLock) { watchersForKey.getAndMaybePut(key) } + private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values } /* - * Return all the current watcher lists + * Return the watch list of the given key, note that we need to + * grab the removeWatchersLock to avoid the operation being added to a removed watcher list */ - private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values } + private def watchForOperation(key: Any, operation: T) { + inReadLock(removeWatchersLock) { + val watcher = watchersForKey.getAndMaybePut(key) + watcher.watch(operation) + } + } /* * Remove the key from watcher lists if its list is empty */ - private def removeKeyIfEmpty(key: Any) = inWriteLock(removeWatchersLock) { - val watchers = watchersForKey.get(key) - if (watchers != null && watchers.watched == 0) { - watchersForKey.remove(key) + private def removeKeyIfEmpty(key: Any, watchers: Watchers) { + inWriteLock(removeWatchersLock) { + // if the current key is no longer correlated to the watchers to remove, skip + if (watchersForKey.get(key) != watchers) + return + + if (watchers != null && watchers.watched == 0) { + watchersForKey.remove(key) + } } } @@ -298,10 +309,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br iter.remove() } } - - if (operations.size == 0) - removeKeyIfEmpty(key) } + + if (operations.size == 0) + removeKeyIfEmpty(key, this) + completed } @@ -317,10 +329,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br purged += 1 } } - - if (operations.size == 0) - removeKeyIfEmpty(key) } + + if (operations.size == 0) + removeKeyIfEmpty(key, this) + purged } }