kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8121; Shutdown ZK client expiry handler earlier during close (#6462)
Date Mon, 18 Mar 2019 18:12:59 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new f082b60  KAFKA-8121; Shutdown ZK client expiry handler earlier during close (#6462)
f082b60 is described below

commit f082b6063ee4e27183d2c2df34986d574829f538
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Mon Mar 18 18:07:24 2019 +0000

    KAFKA-8121; Shutdown ZK client expiry handler earlier during close (#6462)
    
    Shutdown session expiry thread prior to closing ZooKeeper client to ensure that new clients
are not created by the expiry thread and left active when returning from ZooKeeperClient.close().
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala        | 9 ++++++---
 .../test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala    | 3 +++
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 4d5ef96..ad4da8b 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -320,6 +320,12 @@ class ZooKeeperClient(connectString: String,
 
   def close(): Unit = {
     info("Closing.")
+
+    // Shutdown scheduler outside of lock to avoid deadlock if scheduler
+    // is waiting for lock to process session expiry. Close expiry thread
+    // first to ensure that new clients are not created during close().
+    expiryScheduler.shutdown()
+
     inWriteLock(initializationLock) {
       zNodeChangeHandlers.clear()
       zNodeChildChangeHandlers.clear()
@@ -327,9 +333,6 @@ class ZooKeeperClient(connectString: String,
       zooKeeper.close()
       metricNames.foreach(removeMetric(_))
     }
-    // Shutdown scheduler outside of lock to avoid deadlock if scheduler
-    // is waiting for lock to process session expiry
-    expiryScheduler.shutdown()
     info("Closed.")
   }
 
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index e943348..a4b63e0 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -43,6 +43,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Before
   override def setUp() {
+    ZooKeeperTestHarness.verifyNoUnexpectedThreads("@Before")
     cleanMetricsRegistry()
     super.setUp()
     zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
zkMaxInFlightRequests,
@@ -55,6 +56,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       zooKeeperClient.close()
     super.tearDown()
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    ZooKeeperTestHarness.verifyNoUnexpectedThreads("@After")
   }
 
   @Test
@@ -596,6 +598,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
         }
       })
       assertFalse("Close completed without shutting down expiry scheduler gracefully", closeFuture.isDone)
+      assertTrue(zooKeeperClient.currentZooKeeper.getState.isAlive) // Client should be closed
after expiry handler
       semaphore.release()
       closeFuture.get(10, TimeUnit.SECONDS)
       assertFalse("Expiry executor not shutdown", zooKeeperClient.expiryScheduler.isStarted)


Mime
View raw message