kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-12892: Use dedicated root in ZK ACL test (#10821)
Date Sat, 05 Jun 2021 12:18:58 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 37a8659  KAFKA-12892: Use dedicated root in ZK ACL test (#10821)
37a8659 is described below

commit 37a86598269330fb3180b8742dc19ee072391dd9
Author: Igor Soarez <soarez@apple.com>
AuthorDate: Sat Jun 5 13:14:48 2021 +0100

    KAFKA-12892: Use dedicated root in ZK ACL test (#10821)
    
    Having the `testChrootExistsAndRootIsLocked` test in a separate `ZookeeperTestHarness`
isn't enough to prevent the ACL changes to the ZK root from affecting other integration tests.
So instead, let's use a dedicated znode for this test. It still works because `makeSurePersistentPathExists`
uses `createRecursive`, which will recurse and act the same for the root or a given znode.
    
     Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
---
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 28 +++++++++-
 .../test/scala/unit/kafka/zk/ZkClientAclTest.scala | 63 ----------------------
 2 files changed, 26 insertions(+), 65 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 27c15a9..da10152 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,9 +16,11 @@
 */
 package kafka.zk
 
-import java.util.{Collections, Properties}
+import java.util.{Base64, Collections, Properties}
 import java.nio.charset.StandardCharsets.UTF_8
+import java.security.MessageDigest
 import java.util.concurrent.{CountDownLatch, TimeUnit}
+
 import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
@@ -51,7 +53,7 @@ import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.ZooDefs
 import org.apache.zookeeper.client.ZKClientConfig
-import org.apache.zookeeper.data.Stat
+import org.apache.zookeeper.data.{ACL, Id, Stat}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -141,6 +143,28 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testChrootExistsAndRootIsLocked(): Unit = {
+    // chroot is accessible
+    val root = "/testChrootExistsAndRootIsLocked"
+    val chroot = s"$root/chroot"
+    zkClient.makeSurePersistentPathExists(chroot)
+    zkClient.setAcl(chroot, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala)
+
+    // root is inaccessible
+    val scheme = "digest"
+    val id = "test"
+    val pwd = "12345"
+    val digest = Base64.getEncoder.encode(MessageDigest.getInstance("SHA1").digest(s"$id:$pwd".getBytes()))
+    zkClient.currentZooKeeper.addAuthInfo(scheme, digest)
+    zkClient.setAcl(root, Seq(new ACL(ZooDefs.Perms.ALL, new Id(scheme, s"$id:$digest"))))
+
+    // this client won't have access to the root, but the chroot already exists
+    val chrootClient = KafkaZkClient(zkConnect + chroot, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
zkSessionTimeout,
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, createChrootIfNecessary =
true)
+    chrootClient.close()
+  }
+
+  @Test
   def testSetAndGetConsumerOffset(): Unit = {
     val offset = 123L
     // None if no committed offsets
diff --git a/core/src/test/scala/unit/kafka/zk/ZkClientAclTest.scala b/core/src/test/scala/unit/kafka/zk/ZkClientAclTest.scala
deleted file mode 100644
index 19b78a3..0000000
--- a/core/src/test/scala/unit/kafka/zk/ZkClientAclTest.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.zk
-
-import java.security.MessageDigest
-import java.util.Base64
-
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.ZooDefs
-import org.apache.zookeeper.data.{ACL, Id}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-
-import scala.collection.Seq
-import scala.jdk.CollectionConverters._
-
-class ZkClientAclTest extends ZooKeeperTestHarness {
-
-  @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
-  }
-
-  @AfterEach
-  override def tearDown(): Unit = {
-    super.tearDown()
-  }
-
-  @Test
-  def testChrootExistsAndRootIsLocked(): Unit = {
-    // chroot is accessible
-    val chroot = "/chroot"
-    zkClient.makeSurePersistentPathExists(chroot)
-    zkClient.setAcl(chroot, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala)
-
-    // root is inaccessible
-    val scheme = "digest"
-    val id = "test"
-    val pwd = "12345"
-    val digest = Base64.getEncoder.encode(MessageDigest.getInstance("SHA1").digest(s"$id:$pwd".getBytes()))
-    zkClient.currentZooKeeper.addAuthInfo(scheme, digest)
-    zkClient.setAcl("/", Seq(new ACL(ZooDefs.Perms.ALL, new Id(scheme, s"$id:$digest"))))
-
-    // this client won't have access to the root, but the chroot already exists
-    val chrootClient = KafkaZkClient(zkConnect + chroot, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, createChrootIfNecessary =
true)
-    chrootClient.close()
-  }
-}

Mime
View raw message