kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4735; Fix deadlock issue during MM shutdown
Date Tue, 07 Feb 2017 00:02:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2c0055e62 -> 45d9fb3d5


KAFKA-4735; Fix deadlock issue during MM shutdown

In https://issues.apache.org/jira/browse/KAFKA-4521 we fixed a potential message reorder bug
in MM. However, the patch introduced another bug that can cause deadlock during MM shutdown.
The deadlock will happen if zookeeper listener thread call requestAndWaitForCommit() after
MirrorMaker thread has already exited loop of consuming and producing messages.

This patch fixes the problem by setting `iter` to `null` in `MirrorMakerOldConsumer.cleanup()`.
If zookeeper listener thread calls `requestAndWaitForCommit()` after `cleanup()`, then it
will not block waiting for commit notification since `iter == null`. If zookeeper listener
thread calls `requestAndWaitForCommit()` before `cleanup()`, then `cleanup()` will call `notifyAll()`
to unblock zookeeper listener thread.

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>

Closes #2504 from lindong28/KAFKA-4735


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45d9fb3d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45d9fb3d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45d9fb3d

Branch: refs/heads/trunk
Commit: 45d9fb3d5f13bef0bf6809ea71b2cbd73996a1b6
Parents: 2c0055e
Author: Dong Lin <lindong28@gmail.com>
Authored: Mon Feb 6 16:01:59 2017 -0800
Committer: Jiangjie Qin <becket.qin@gmail.com>
Committed: Mon Feb 6 16:01:59 2017 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/MirrorMaker.scala | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45d9fb3d/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 42456f7..a2866ad 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -527,7 +527,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     override def requestAndWaitForCommit() {
       this.synchronized {
-        // skip wait() if mirrorMakerConsumer has not been initialized
+        // only wait() if mirrorMakerConsumer has been initialized and it has not been cleaned
up.
         if (iter != null) {
           immediateCommitRequested = true
           this.wait()
@@ -566,6 +566,15 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
 
     override def cleanup() {
+      // We need to set the iterator to null and notify the rebalance listener thread.
+      // This is to handle the case that the consumer rebalance is triggered when the
+      // mirror maker thread is shutting down and the rebalance listener is waiting for the
offset commit.
+      this.synchronized {
+        iter = null
+        if (immediateCommitRequested) {
+          notifyCommit()
+        }
+      }
       connector.shutdown()
     }
 


Mime
View raw message