kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Close ZooKeeperClient if waitUntilConnected fails during construction (#5411)
Date Sun, 22 Jul 2018 16:49:40 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5db2f99  MINOR: Close ZooKeeperClient if waitUntilConnected fails during construction
(#5411)
5db2f99 is described below

commit 5db2f9903a1e1d9fe574730e89aec0022333db71
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Sun Jul 22 22:19:32 2018 +0530

    MINOR: Close ZooKeeperClient if waitUntilConnected fails during construction (#5411)
    
    This has always been an issue, but the recent upgrade to ZooKeeper
    3.4.13 means it is also an issue when an unresolvable ZK
    address is used, causing some tests to leak threads.
    
    The change in behaviour in ZK 3.4.13 is that no exception is thrown
    from the ZooKeeper constructor in case of an unresolvable address.
    Instead, ZooKeeper tries to re-resolve the address hoping it becomes
    resolvable again. We eventually throw a
    `ZooKeeperClientTimeoutException`, which is similar to the case
    where the the address is resolvable but ZooKeeper is not
    reachable.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../src/main/scala/kafka/zookeeper/ZooKeeperClient.scala |  7 ++++++-
 .../scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 16 +++++++++++++---
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 5cb127c..97ec9a4 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -92,7 +92,12 @@ class ZooKeeperClient(connectString: String,
   metricNames += "SessionState"
 
   expiryScheduler.startup()
-  waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+  try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+  catch {
+    case e: Throwable =>
+      close()
+      throw e
+  }
 
   override def metricName(name: String, metricTags: scala.collection.Map[String, String]):
MetricName = {
     explicitMetricName(metricGroup, metricType, name, metricTags)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index fcbf699..0088c65 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -57,12 +57,22 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
   }
 
-  @Test(expected = classOf[ZooKeeperClientTimeoutException])
+  @Test
   def testUnresolvableConnectString(): Unit = {
-    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", zkSessionTimeout, connectionTimeoutMs
= 10,
-      Int.MaxValue, time, "testMetricGroup", "testMetricType").close()
+    try {
+      new ZooKeeperClient("some.invalid.hostname.foo.bar.local", zkSessionTimeout, connectionTimeoutMs
= 10,
+        Int.MaxValue, time, "testMetricGroup", "testMetricType")
+    } catch {
+      case e: ZooKeeperClientTimeoutException =>
+        assertEquals("ZooKeeper client threads still running", Set.empty,  runningZkSendThreads)
+    }
   }
 
+  private def runningZkSendThreads: collection.Set[String] = Thread.getAllStackTraces.keySet.asScala
+    .filter(_.isAlive)
+    .map(_.getName)
+    .filter(t => t.contains("SendThread()"))
+
   @Test(expected = classOf[ZooKeeperClientTimeoutException])
   def testConnectionTimeout(): Unit = {
     zookeeper.shutdown()


Mime
View raw message