kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.0 updated: MINOR: Close ZooKeeperClient if waitUntilConnected fails during construction (#5411)
Date Sun, 22 Jul 2018 16:50:56 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 4a00b79  MINOR: Close ZooKeeperClient if waitUntilConnected fails during construction
(#5411)
4a00b79 is described below

commit 4a00b79d12761bd55fc01b1be2d9c60788945233
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Sun Jul 22 22:19:32 2018 +0530

    MINOR: Close ZooKeeperClient if waitUntilConnected fails during construction (#5411)
    
    This has always been an issue, but the recent upgrade to ZooKeeper
    3.4.13 means it is also an issue when an unresolvable ZK
    address is used, causing some tests to leak threads.
    
    The change in behaviour in ZK 3.4.13 is that no exception is thrown
    from the ZooKeeper constructor in case of an unresolvable address.
    Instead, ZooKeeper tries to re-resolve the address hoping it becomes
    resolvable again. We eventually throw a
    `ZooKeeperClientTimeoutException`, which is similar to the case
    where the the address is resolvable but ZooKeeper is not
    reachable.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../src/main/scala/kafka/zookeeper/ZooKeeperClient.scala |  7 ++++++-
 .../scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 16 +++++++++++++---
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 5cb127c..97ec9a4 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -92,7 +92,12 @@ class ZooKeeperClient(connectString: String,
   metricNames += "SessionState"
 
   expiryScheduler.startup()
-  waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+  try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+  catch {
+    case e: Throwable =>
+      close()
+      throw e
+  }
 
   override def metricName(name: String, metricTags: scala.collection.Map[String, String]):
MetricName = {
     explicitMetricName(metricGroup, metricType, name, metricTags)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index fcbf699..0088c65 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -57,12 +57,22 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
   }
 
-  @Test(expected = classOf[ZooKeeperClientTimeoutException])
+  @Test
   def testUnresolvableConnectString(): Unit = {
-    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", zkSessionTimeout, connectionTimeoutMs
= 10,
-      Int.MaxValue, time, "testMetricGroup", "testMetricType").close()
+    try {
+      new ZooKeeperClient("some.invalid.hostname.foo.bar.local", zkSessionTimeout, connectionTimeoutMs
= 10,
+        Int.MaxValue, time, "testMetricGroup", "testMetricType")
+    } catch {
+      case e: ZooKeeperClientTimeoutException =>
+        assertEquals("ZooKeeper client threads still running", Set.empty,  runningZkSendThreads)
+    }
   }
 
+  private def runningZkSendThreads: collection.Set[String] = Thread.getAllStackTraces.keySet.asScala
+    .filter(_.isAlive)
+    .map(_.getName)
+    .filter(t => t.contains("SendThread()"))
+
   @Test(expected = classOf[ZooKeeperClientTimeoutException])
   def testConnectionTimeout(): Unit = {
     zookeeper.shutdown()


Mime
View raw message