kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8018: Flaky Test SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed
Date Thu, 07 Mar 2019 10:07:42 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 78648da  KAFKA-8018: Flaky Test SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed
78648da is described below

commit 78648dab04f5b9edf598c73d0cf3b11e529e9c45
Author: Jun Rao <junrao@gmail.com>
AuthorDate: Thu Mar 7 15:33:54 2019 +0530

    KAFKA-8018: Flaky Test SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed
    
    Disable forceSync in EmbeddedZookeeper.
    Increase ZK tick to allow longer maxSessionTimeout in tests.
    Increase ZK client session timeout in tests.
    Handle transient ZK session expiration exception in test utils for createTopic.
    
    Author: Jun Rao <junrao@gmail.com>
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>,
Ismael Juma <ismael@juma.me.uk>
    
    Closes #6354 from junrao/KAFKA-8018
---
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 25 ++++++++++++++++++++--
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala    |  4 +++-
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  2 +-
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c582bc5..3afe520 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -53,6 +53,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
+import org.apache.zookeeper.KeeperException.SessionExpiredException
 import org.apache.zookeeper.ZooDefs._
 import org.apache.zookeeper.data.ACL
 import org.junit.Assert._
@@ -301,7 +302,17 @@ object TestUtils extends Logging {
                   topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int,
Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
-    adminZkClient.createTopic(topic, numPartitions, replicationFactor, topicConfig)
+    TestUtils.waitUntilTrue( () => {
+      var hasSessionExpirationException = false
+      try {
+        adminZkClient.createTopic(topic, numPartitions, replicationFactor, topicConfig)
+      } catch {
+        case _: SessionExpiredException => hasSessionExpirationException = true
+        case e => throw e // let other exceptions propagate
+      }
+      !hasSessionExpirationException},
+      s"Can't create topic $topic")
+
     // wait until the update metadata request for new topic reaches all servers
     (0 until numPartitions).map { i =>
       TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
@@ -333,7 +344,17 @@ object TestUtils extends Logging {
                   topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
-    adminZkClient.createTopicWithAssignment(topic, topicConfig, partitionReplicaAssignment)
+    TestUtils.waitUntilTrue( () => {
+      var hasSessionExpirationException = false
+      try {
+        adminZkClient.createTopicWithAssignment(topic, topicConfig, partitionReplicaAssignment)
+      } catch {
+        case _: SessionExpiredException => hasSessionExpirationException = true
+        case e => throw e // let other exceptions propagate
+      }
+      !hasSessionExpirationException},
+      s"Can't create topic $topic")
+
     // wait until the update metadata request for new topic reaches all servers
     partitionReplicaAssignment.keySet.map { i =>
       TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index d4a829d..32ca969 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -38,7 +38,9 @@ class EmbeddedZookeeper() extends Logging {
 
   val snapshotDir = TestUtils.tempDir()
   val logDir = TestUtils.tempDir()
-  val tickTime = 500
+  val tickTime = 800 // allow a maxSessionTimeout of 20 * 800ms = 16 secs
+
+  System.setProperty("zookeeper.forceSync", "no")  //disable fsync to ZK txn log in tests
to avoid timeout
   val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
   val factory = new NIOServerCnxnFactory()
   private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort)
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 2f75fa2..ebb5fb7 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -40,7 +40,7 @@ import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
   val zkConnectionTimeout = 10000
-  val zkSessionTimeout = 6000
+  val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due to GC up to
2/3 * 15000ms = 10 secs
   val zkMaxInFlightRequests = Int.MaxValue
 
   protected def zkAclsEnabled: Option[Boolean] = None


Mime
View raw message