kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3069: Fix recursion in ZkSecurityMigrator
Date Tue, 12 Jan 2016 17:49:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3e5afbfa0 -> 72eebad43


KAFKA-3069: Fix recursion in ZkSecurityMigrator

I'm also fixing a bug in the testChroot test case.

Author: Flavio Junqueira <fpj@apache.org>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #736 from fpj/KAFKA-3069


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/72eebad4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/72eebad4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/72eebad4

Branch: refs/heads/trunk
Commit: 72eebad43d5aaf4bbd29532eedc2a793fc3ee9d5
Parents: 3e5afbf
Author: Flavio Junqueira <fpj@apache.org>
Authored: Tue Jan 12 09:48:47 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Jan 12 09:48:47 2016 -0800

----------------------------------------------------------------------
 .../scala/kafka/admin/ZkSecurityMigrator.scala  | 26 +++++++++++---
 .../security/auth/ZkAuthorizationTest.scala     | 36 +++++++++++++-------
 2 files changed, 46 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/72eebad4/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 8e2f040..2080879 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -128,16 +128,33 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
   private val workQueue = new LinkedBlockingQueue[Runnable]
   private val futures = new Queue[Future[String]]
 
-  private def setAclsRecursively(path: String) = {
+  private def setAcl(path: String, setPromise: Promise[String]) = {
     info("Setting ACL for path %s".format(path))
+    zkUtils.zkConnection.getZookeeper.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure),
-1, SetACLCallback, setPromise)
+  }
+
+  private def getChildren(path: String, childrenPromise: Promise[String]) = {
+    info("Getting children to set ACLs for path %s".format(path))
+    zkUtils.zkConnection.getZookeeper.getChildren(path, false, GetChildrenCallback, childrenPromise)
+  }
+
+  private def setAclIndividually(path: String) = {
+    val setPromise = Promise[String]
+    futures.synchronized {
+      futures += setPromise.future
+    }
+    setAcl(path, setPromise)
+  }
+
+  private def setAclsRecursively(path: String) = {
     val setPromise = Promise[String]
     val childrenPromise = Promise[String]
     futures.synchronized {
       futures += setPromise.future
       futures += childrenPromise.future
     }
-    zkUtils.zkConnection.getZookeeper.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure),
-1, SetACLCallback, setPromise)
-    zkUtils.zkConnection.getZookeeper.getChildren(path, false, GetChildrenCallback, childrenPromise)
+    setAcl(path, setPromise)
+    getChildren(path, childrenPromise)
   }
 
   private object GetChildrenCallback extends ChildrenCallback {
@@ -205,11 +222,12 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
 
   private def run(): Unit = {
     try {
+      setAclIndividually("/")
       for (path <- zkUtils.securePersistentZkPaths) {
         debug("Going to set ACL for %s".format(path))
         zkUtils.makeSurePersistentPathExists(path)
+        setAclsRecursively(path)
       }
-      setAclsRecursively("/")
       
       @tailrec
       def recurse(): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/72eebad4/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index c4e4299..2d73f4d 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -118,7 +118,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
   def testZkMigration() {
     val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false) 
     try {
-      testMigration(unsecureZkUtils, zkUtils)
+      testMigration(zkConnect, unsecureZkUtils, zkUtils)
     } finally {
       unsecureZkUtils.close()
     }
@@ -132,7 +132,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
   def testZkAntiMigration() {
     val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
     try {
-      testMigration(zkUtils, unsecureZkUtils)
+      testMigration(zkConnect, zkUtils, unsecureZkUtils)
     } finally {
       unsecureZkUtils.close()
     }
@@ -169,11 +169,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
    */
   @Test
   def testChroot {
+    val zkUrl = zkConnect + "/kafka"
     zkUtils.createPersistentPath("/kafka")
-    val unsecureZkUtils = ZkUtils(zkConnect + "/kafka", 6000, 6000, false)
-    val secureZkUtils = ZkUtils(zkConnect + "/kafka", 6000, 6000, true)
+    val unsecureZkUtils = ZkUtils(zkUrl, 6000, 6000, false)
+    val secureZkUtils = ZkUtils(zkUrl, 6000, 6000, true)
     try {
-      testMigration(unsecureZkUtils, secureZkUtils)
+      testMigration(zkUrl, unsecureZkUtils, secureZkUtils)
     } finally {
       unsecureZkUtils.close()
       secureZkUtils.close()
@@ -181,11 +182,11 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
   }
 
   /**
-   * Exercises the migration tool. It is used by two test cases:
-   * testZkMigration and testZkAntiMigration.
+   * Exercises the migration tool. It is used in these test cases:
+   * testZkMigration, testZkAntiMigration, testChroot.
    */
-  private def testMigration(firstZk: ZkUtils, secondZk: ZkUtils) {
-    info(s"zkConnect string: $zkConnect")
+  private def testMigration(zkUrl: String, firstZk: ZkUtils, secondZk: ZkUtils) {
+    info(s"zkConnect string: $zkUrl")
     for (path <- firstZk.securePersistentZkPaths) {
       info(s"Creating $path")
       firstZk.makeSurePersistentPathExists(path)
@@ -193,11 +194,19 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
       // traversal of the data tree
       firstZk.createPersistentPath(s"$path/fpjwashere", "")
     }
+    // Getting security option to determine how to verify ACLs.
+    // Additionally, we create the consumers znode (not in
+    // securePersistentZkPaths) to make sure that we don't
+    // add ACLs to it.
     val secureOpt: String  = secondZk.isSecure match {
-      case true => "secure"
-      case false => "unsecure"
+      case true =>
+        firstZk.createPersistentPath(ZkUtils.ConsumersPath)
+        "secure"
+      case false =>
+        secondZk.createPersistentPath(ZkUtils.ConsumersPath)
+        "unsecure"
     }
-    ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkConnect"))
+    ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
     info("Done with migration")
     for (path <- secondZk.securePersistentZkPaths) {
       val listParent = (secondZk.zkConnection.getAcl(path)).getKey
@@ -207,6 +216,9 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
       val listChild = (secondZk.zkConnection.getAcl(childPath)).getKey
       assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure))
     }
+    // Check consumers path.
+    val consumersAcl = (firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath)).getKey
+    assertTrue(ZkUtils.ConsumersPath, isAclCorrect(consumersAcl, false))
   }
 
   /**


Mime
View raw message