kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6879; Invoke session init callbacks outside lock to avoid Controller deadlock (#4977)
Date Tue, 08 May 2018 22:52:48 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bce1079  KAFKA-6879; Invoke session init callbacks outside lock to avoid Controller
deadlock (#4977)
bce1079 is described below

commit bce10794a016f86cda63ceb437d8db7dfc65063b
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue May 8 15:52:43 2018 -0700

    KAFKA-6879; Invoke session init callbacks outside lock to avoid Controller deadlock (#4977)
    
    Fixes a deadlock between the controller's beforeInitializingSession callback which holds
the zookeeper client initialization lock while awaiting completion of an asynchronous event
which itself depends on the same lock.
    
    Also catch and log callback exceptions to ensure the ZooKeeper reconnection takes place.
    Finally, configure KafkaScheduler in ZooKeeperClient to have at least 1 thread.
    
    Added tests that fail or hang without the changes in this PR.
    
    Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../scala/kafka/controller/KafkaController.scala   | 13 ++--
 .../main/scala/kafka/utils/KafkaScheduler.scala    |  8 ++-
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    | 74 +++++++++++++-------
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 80 +++++++++++++++++++++-
 4 files changed, 140 insertions(+), 35 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index bc721e39..d3d1a81 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -160,7 +160,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
       override def beforeInitializingSession(): Unit = {
         val expireEvent = new Expire
         eventManager.clearAndPut(expireEvent)
-        expireEvent.waitUntilProcessed()
+
+        // Block initialization of the new session until the expiration event is being handled,
+        // which ensures that all pending events have been processed before creating the
new session
+        expireEvent.waitUntilProcessingStarted()
       }
     })
     eventManager.put(Startup)
@@ -1518,17 +1521,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
 
   // We can't make this a case object due to the countDownLatch field
   class Expire extends ControllerEvent {
-    private val countDownLatch = new CountDownLatch(1)
+    private val processingStarted = new CountDownLatch(1)
     override def state = ControllerState.ControllerChange
 
     override def process(): Unit = {
-      countDownLatch.countDown()
+      processingStarted.countDown()
       activeControllerId = -1
       onControllerResignation()
     }
 
-    def waitUntilProcessed(): Unit = {
-      countDownLatch.await()
+    def waitUntilProcessingStarted(): Unit = {
+      processingStarted.await()
     }
   }
 
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 5407934..24eb177 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -99,11 +99,15 @@ class KafkaScheduler(val threads: Int,
     }
   }
 
-  def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit)
{
+  def scheduleOnce(name: String, fun: () => Unit): Unit = {
+    schedule(name, fun, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+  }
+
+  def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit)
{
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period,
unit)))
     this synchronized {
-      ensureRunning
+      ensureRunning()
       val runnable = CoreUtils.runnable {
         try {
           trace("Beginning execution of scheduled task '%s'.".format(name))
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 5c4cd68..5cb127c 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -59,7 +59,7 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, StateChangeHandler]().asScala
-  private[zookeeper] val expiryScheduler = new KafkaScheduler(0, "zk-session-expiry-handler")
+  private[zookeeper] val expiryScheduler = new KafkaScheduler(threads = 1, "zk-session-expiry-handler")
 
   private val metricNames = Set[String]()
 
@@ -325,43 +325,65 @@ class ZooKeeperClient(connectString: String,
     zooKeeper
   }
   
-  private def initialize(): Unit = {
-    if (!connectionState.isAlive) {
-      zooKeeper.close()
-      info(s"Initializing a new session to $connectString.")
-      // retry forever until ZooKeeper can be instantiated
-      var connected = false
-      while (!connected) {
-        try {
-          zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
-          connected = true
-        } catch {
-          case e: Exception =>
-            info("Error when recreating ZooKeeper, retrying after a short sleep", e)
-            Thread.sleep(1000)
+  private def reinitialize(): Unit = {
+    // Initialization callbacks are invoked outside of the lock to avoid deadlock potential
since their completion
+    // may require additional Zookeeper requests, which will block to acquire the initialization
lock
+    stateChangeHandlers.values.foreach(callBeforeInitializingSession _)
+
+    inWriteLock(initializationLock) {
+      if (!connectionState.isAlive) {
+        zooKeeper.close()
+        info(s"Initializing a new session to $connectString.")
+        // retry forever until ZooKeeper can be instantiated
+        var connected = false
+        while (!connected) {
+          try {
+            zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
+            connected = true
+          } catch {
+            case e: Exception =>
+              info("Error when recreating ZooKeeper, retrying after a short sleep", e)
+              Thread.sleep(1000)
+          }
         }
       }
     }
+
+    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
   }
 
   /**
-   * reinitialize method to use in unit tests
+   * Close the zookeeper client to force session reinitialization. This is visible for testing
only.
    */
-  private[zookeeper] def reinitialize(): Unit = {
+  private[zookeeper] def forceReinitialize(): Unit = {
     zooKeeper.close()
-    initialize()
+    reinitialize()
+  }
+
+  private def callBeforeInitializingSession(handler: StateChangeHandler): Unit = {
+    try {
+      handler.beforeInitializingSession()
+    } catch {
+      case t: Throwable =>
+        error(s"Uncaught error in handler ${handler.name}", t)
+    }
+  }
+
+  private def callAfterInitializingSession(handler: StateChangeHandler): Unit = {
+    try {
+      handler.afterInitializingSession()
+    } catch {
+      case t: Throwable =>
+        error(s"Uncaught error in handler ${handler.name}", t)
+    }
   }
 
   // Visibility for testing
   private[zookeeper] def scheduleSessionExpiryHandler(): Unit = {
-    expiryScheduler.schedule("zk-session-expired", () => {
-      inWriteLock(initializationLock) {
-        info("Session expired.")
-        stateChangeHandlers.values.foreach(_.beforeInitializingSession())
-        initialize()
-        stateChangeHandlers.values.foreach(_.afterInitializingSession())
-      }
-    }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+    expiryScheduler.scheduleOnce("zk-session-expired", () => {
+      info("Session expired.")
+      reinitialize()
+    })
   }
 
   // package level visibility for testing only
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 77e11ea..c4143e2 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -18,7 +18,7 @@ package kafka.zookeeper
 
 import java.nio.charset.StandardCharsets
 import java.util.UUID
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, Executors,
Semaphore, TimeUnit}
 
 import com.yammer.metrics.Metrics
@@ -305,6 +305,82 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testBlockOnRequestCompletionFromStateChangeHandler(): Unit = {
+    // This tests the scenario exposed by KAFKA-6879 in which the expiration callback awaits
+    // completion of a request which is handled by another thread
+
+    val latch = new CountDownLatch(1)
+    val stateChangeHandler = new StateChangeHandler {
+      override val name = this.getClass.getName
+      override def beforeInitializingSession(): Unit = {
+        latch.await()
+      }
+    }
+
+    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue,
time,
+      "testMetricGroup", "testMetricType")
+    client.registerStateChangeHandler(stateChangeHandler)
+
+    val requestThread = new Thread() {
+      override def run(): Unit = {
+        try
+          client.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+            ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+        finally
+          latch.countDown()
+      }
+    }
+
+    val reinitializeThread = new Thread() {
+      override def run(): Unit = {
+        client.forceReinitialize()
+      }
+    }
+
+    reinitializeThread.start()
+
+    // sleep briefly before starting the request thread so that the initialization
+    // thread is blocking on the latch
+    Thread.sleep(100)
+    requestThread.start()
+
+    reinitializeThread.join()
+    requestThread.join()
+  }
+
+  @Test
+  def testExceptionInBeforeInitializingSession(): Unit = {
+    val faultyHandler = new StateChangeHandler {
+      override val name = this.getClass.getName
+      override def beforeInitializingSession(): Unit = {
+        throw new RuntimeException()
+      }
+    }
+
+    val goodHandler = new StateChangeHandler {
+      val calls = new AtomicInteger(0)
+      override val name = this.getClass.getName
+      override def beforeInitializingSession(): Unit = {
+        calls.incrementAndGet()
+      }
+    }
+
+    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue,
time,
+      "testMetricGroup", "testMetricType")
+    client.registerStateChangeHandler(faultyHandler)
+    client.registerStateChangeHandler(goodHandler)
+
+    client.forceReinitialize()
+
+    assertEquals(1, goodHandler.calls.get)
+
+    // Client should be usable even if the callback throws an error
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+  }
+
+  @Test
   def testZNodeChildChangeHandlerForChildChange(): Unit = {
     import scala.collection.JavaConverters._
     val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
@@ -343,7 +419,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       "testMetricGroup", "testMetricType")
     try {
       zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
-      zooKeeperClient.reinitialize()
+      zooKeeperClient.forceReinitialize()
 
       assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
     } finally zooKeeperClient.close()

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.

Mime
View raw message