kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3783; Catch proper exception on path delete
Date Tue, 07 Jun 2016 00:38:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 6500b53c7 -> 780bc646d


KAFKA-3783; Catch proper exception on path delete

- ZkClient is used for conditional path deletion and wraps `KeeperException.BadVersionException`
into `ZkBadVersionException`
- add unit test to `SimpleAclAuthorizerTest` to reproduce the issue and catch potential future
regression

Author: Sebastien Launay <sebastien@opendns.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1461 from slaunay/bugfix/KAFKA-3783-zk-conditional-delete-path

(cherry picked from commit f643d1b75d17bb27a378c7e66fcc49607454e445)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/780bc646
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/780bc646
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/780bc646

Branch: refs/heads/0.10.0
Commit: 780bc646d543eab86faaa215a15f51fba7c1c4f8
Parents: 6500b53
Author: Sebastien Launay <sebastien@opendns.com>
Authored: Tue Jun 7 01:22:58 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Jun 7 01:38:05 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  4 +-
 .../security/auth/SimpleAclAuthorizerTest.scala | 20 ++++++-
 .../scala/unit/kafka/utils/ZkUtilsTest.scala    | 55 ++++++++++++++++++++
 3 files changed, 76 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/780bc646/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index fad2c9c..de4a977 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -515,14 +515,14 @@ class ZkUtils(val zkClient: ZkClient,
 
   /**
     * Conditional delete the persistent path data, return true if it succeeds,
-    * otherwise (the current version is not the expected version)
+    * false otherwise (the current version is not the expected version)
     */
    def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = {
     try {
       zkClient.delete(path, expectedVersion)
       true
     } catch {
-      case e: KeeperException.BadVersionException => false
+      case e: ZkBadVersionException => false
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/780bc646/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 7fcc33d..1f52af4 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -336,12 +336,30 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
       aclId % 10 != 0
     }.toSet
 
-    TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions,
15000)
+    TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions,
30 * 1000)
 
     TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer, commonResource)
     TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer2, commonResource)
   }
 
+  @Test
+  def testHighConcurrencyDeletionOfResourceAcls() {
+    val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost,
All)
+
+    // Alternate authorizer to keep adding and removing zookeeper path
+    val concurrentFuctions = (0 to 50).map { i =>
+      () => {
+        simpleAclAuthorizer.addAcls(Set(acl), resource)
+        simpleAclAuthorizer2.removeAcls(Set(acl), resource)
+      }
+    }
+
+    TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions,
30 * 1000)
+
+    TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
+    TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer2, resource)
+  }
+
   private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls:
Set[Acl], resource: Resource = resource): Set[Acl] = {
     var acls = originalAcls
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/780bc646/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
new file mode 100755
index 0000000..2d81ed9
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Assert._
+import org.junit.Test
+
+class ZkUtilsTest extends ZooKeeperTestHarness {
+
+  val path = "/path"
+
+  @Test
+  def testSuccessfulConditionalDeletePath() {
+    // Given an existing path
+    zkUtils.createPersistentPath(path)
+    val (_, statAfterCreation) = zkUtils.readData(path)
+
+    // Deletion is successful when the version number matches
+    assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, statAfterCreation.getVersion))
+    val (optionalData, _) = zkUtils.readDataMaybeNull(path)
+    assertTrue("Node should be deleted", optionalData.isEmpty)
+
+    // Deletion is successful when the node does not exist too
+    assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, 0))
+  }
+
+  @Test
+  def testAbortedConditionalDeletePath() {
+    // Given an existing path that gets updated
+    zkUtils.createPersistentPath(path)
+    val (_, statAfterCreation) = zkUtils.readData(path)
+    zkUtils.updatePersistentPath(path, "data")
+
+    // Deletion is aborted when the version number does not match
+    assertFalse("Deletion should be aborted", zkUtils.conditionalDeletePath(path, statAfterCreation.getVersion))
+    val (optionalData, _) = zkUtils.readDataMaybeNull(path)
+    assertTrue("Node should still be there", optionalData.isDefined)
+  }
+}


Mime
View raw message