kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-4943: Make /config/users with SCRAM credentials not world-readable
Date Fri, 07 Apr 2017 21:07:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 47f9e8fe2 -> 08a5b6421


KAFKA-4943: Make /config/users with SCRAM credentials not world-readable

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma, Jun Rao

Closes #2733 from rajinisivaram/KAFKA-4943

(cherry picked from commit 67fc2a91a6e80bfda30537c7c95cd9ce396d4e7d)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/08a5b642
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/08a5b642
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/08a5b642

Branch: refs/heads/0.10.2
Commit: 08a5b6421c69dec46487ff0136020dd0030fe34a
Parents: 47f9e8f
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Fri Apr 7 11:07:06 2017 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Apr 7 14:07:05 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/admin/ZkSecurityMigrator.scala  |  4 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 93 +++++++++++++-------
 .../security/auth/ZkAuthorizationTest.scala     | 28 +++---
 3 files changed, 80 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/08a5b642/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index eb5c142..172233b 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -127,7 +127,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
 
   private def setAcl(path: String, setPromise: Promise[String]) = {
     info("Setting ACL for path %s".format(path))
-    zkUtils.zkConnection.getZookeeper.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure),
-1, SetACLCallback, setPromise)
+    zkUtils.zkConnection.getZookeeper.setACL(path, zkUtils.defaultAcls(path), -1, SetACLCallback,
setPromise)
   }
 
   private def getChildren(path: String, childrenPromise: Promise[String]) = {
@@ -201,7 +201,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
           info("Successfully set ACLs for %s".format(path))
           promise success "done"
         case Code.CONNECTIONLOSS =>
-            zkHandle.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure), -1, SetACLCallback,
ctx)
+            zkHandle.setACL(path, zkUtils.defaultAcls(path), -1, SetACLCallback, ctx)
         case Code.NONODE =>
           warn("Znode is gone, it could be have been legitimately deleted: %s".format(path))
           promise success "done"

http://git-wip-us.apache.org/repos/asf/kafka/blob/08a5b642/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index e67e264..1d77cd7 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,6 +17,7 @@
 
 package kafka.utils
 
+import java.util.Collections
 import java.util.concurrent.CountDownLatch
 
 import kafka.admin._
@@ -65,6 +66,7 @@ object ZkUtils {
   val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election"
   val BrokerSequenceIdPath = s"$BrokersPath/seqid"
   val ConfigChangesPath = s"$ConfigPath/changes"
+  val ConfigUsersPath = s"$ConfigPath/users"
 
 
   // Important: it is necessary to add any new top level Zookeeper path to the Seq
@@ -78,6 +80,10 @@ object ZkUtils {
                               KafkaAclPath,
                               KafkaAclChangesPath)
 
+  // Important: it is necessary to add any new top level Zookeeper path that contains
+  //            sensitive information that should not be world readable to the Seq
+  val SensitiveZkRootPaths = Seq(ConfigUsersPath)
+
   def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled:
Boolean): ZkUtils = {
     val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
     new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled)
@@ -101,13 +107,23 @@ object ZkUtils {
     (zkClient, zkConnection)
   }
 
-  def DefaultAcls(isSecure: Boolean): java.util.List[ACL] = if (isSecure) {
-    val list = new java.util.ArrayList[ACL]
-    list.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
-    list.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
-    list
-  } else {
-    ZooDefs.Ids.OPEN_ACL_UNSAFE
+  def sensitivePath(path: String): Boolean = {
+    path != null && !SensitiveZkRootPaths.forall(!path.startsWith(_))
+  }
+
+  @deprecated("This is deprecated, use defaultAcls(isSecure, path) which doesn't make sensitive
data world readable", since = "0.10.2.1")
+  def DefaultAcls(isSecure: Boolean): java.util.List[ACL] = defaultAcls(isSecure, "")
+
+  def defaultAcls(isSecure: Boolean, path: String): java.util.List[ACL] = {
+    if (isSecure) {
+      val list = new java.util.ArrayList[ACL]
+      list.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
+      if (!sensitivePath(path)) {
+        list.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
+      }
+      list
+    } else
+      ZooDefs.Ids.OPEN_ACL_UNSAFE
   }
 
   def maybeDeletePath(zkUrl: String, dir: String) {
@@ -219,7 +235,11 @@ class ZkUtils(val zkClient: ZkClient,
                               BrokerSequenceIdPath,
                               IsrChangeNotificationPath)
 
-  val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)
+  @deprecated("This is deprecated, use defaultAcls(path) which doesn't make sensitive data
world readable", since = "0.10.2.1")
+  val DefaultAcls: java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, "")
+
+  private val useDefaultAcl  = Collections.emptyList[ACL]
+  def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path)
 
   def getController(): Int = {
     readDataMaybeNull(ControllerPath)._1 match {
@@ -412,11 +432,15 @@ class ZkUtils(val zkClient: ZkClient,
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
    */
-  def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = DefaultAcls)
{
+  def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = useDefaultAcl)
{
     //Consumer path is kept open as different consumers will write under this node.
     val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) {
       ZooDefs.Ids.OPEN_ACL_UNSAFE
-    } else acls
+    } else if (acls == useDefaultAcl) {
+      ZkUtils.defaultAcls(isSecure, path)
+    } else {
+      acls
+    }
 
     if (!zkClient.exists(path))
       ZkPath.createPersistent(zkClient, path, createParents = true, acl) //won't throw NoNodeException
or NodeExistsException
@@ -425,23 +449,25 @@ class ZkUtils(val zkClient: ZkClient,
   /**
    *  create the parent path
    */
-  private def createParentPath(path: String, acls: java.util.List[ACL] = DefaultAcls): Unit
= {
+  private def createParentPath(path: String, acls: java.util.List[ACL] = useDefaultAcl):
Unit = {
+    val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
     val parentDir = path.substring(0, path.lastIndexOf('/'))
     if (parentDir.length != 0) {
-      ZkPath.createPersistent(zkClient, parentDir, createParents = true, acls)
+      ZkPath.createPersistent(zkClient, parentDir, createParents = true, acl)
     }
   }
 
   /**
    * Create an ephemeral node with the given path and data. Create parents if necessary.
    */
-  private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] =
DefaultAcls): Unit = {
+  private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] =
useDefaultAcl): Unit = {
+    val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
-      ZkPath.createEphemeral(zkClient, path, data, acls)
+      ZkPath.createEphemeral(zkClient, path, data, acl)
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
-        ZkPath.createEphemeral(zkClient, path, data, acls)
+        ZkPath.createEphemeral(zkClient, path, data, acl)
     }
   }
 
@@ -449,9 +475,10 @@ class ZkUtils(val zkClient: ZkClient,
    * Create an ephemeral node with the given path and data.
    * Throw NodeExistException if node already exists.
    */
-  def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL]
= DefaultAcls): Unit = {
+  def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL]
= useDefaultAcl): Unit = {
+    val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
-      createEphemeralPath(path, data, acls)
+      createEphemeralPath(path, data, acl)
     } catch {
       case e: ZkNodeExistsException =>
         // this can happen when there is connection loss; make sure the data is what we intend
to write
@@ -474,18 +501,20 @@ class ZkUtils(val zkClient: ZkClient,
   /**
    * Create an persistent node with the given path and data. Create parents if necessary.
    */
-  def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = DefaultAcls):
Unit = {
+  def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = useDefaultAcl):
Unit = {
+    val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
-      ZkPath.createPersistent(zkClient, path, data, acls)
+      ZkPath.createPersistent(zkClient, path, data, acl)
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
-        ZkPath.createPersistent(zkClient, path, data, acls)
+        ZkPath.createPersistent(zkClient, path, data, acl)
     }
   }
 
-  def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL]
= DefaultAcls): String = {
-    ZkPath.createPersistentSequential(zkClient, path, data, acls)
+  def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL]
= useDefaultAcl): String = {
+    val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
+    ZkPath.createPersistentSequential(zkClient, path, data, acl)
   }
 
   /**
@@ -493,14 +522,15 @@ class ZkUtils(val zkClient: ZkClient,
    * create parent directory if necessary. Never throw NodeExistException.
    * Return the updated path zkVersion
    */
-  def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls)
= {
+  def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = useDefaultAcl)
= {
+    val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
       zkClient.writeData(path, data)
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
         try {
-          ZkPath.createPersistent(zkClient, path, data, acls)
+          ZkPath.createPersistent(zkClient, path, data, acl)
         } catch {
           case _: ZkNodeExistsException =>
             zkClient.writeData(path, data)
@@ -563,13 +593,14 @@ class ZkUtils(val zkClient: ZkClient,
    * Update the value of a persistent node with the given path and data.
    * create parent directory if necessary. Never throw NodeExistException.
    */
-  def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls):
Unit = {
+  def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = useDefaultAcl):
Unit = {
+    val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
       zkClient.writeData(path, data)
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
-        ZkPath.createEphemeral(zkClient, path, data, acls)
+        ZkPath.createEphemeral(zkClient, path, data, acl)
     }
   }
 
@@ -818,13 +849,14 @@ class ZkUtils(val zkClient: ZkClient,
     * It uses the stat returned by the zookeeper and return the version. Every time
     * client updates the path stat.version gets incremented. Starting value of sequence number
is 1.
     */
-  def getSequenceId(path: String, acls: java.util.List[ACL] = DefaultAcls): Int = {
+  def getSequenceId(path: String, acls: java.util.List[ACL] = useDefaultAcl): Int = {
+    val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
     def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
     try {
       writeToZk
     } catch {
       case _: ZkNoNodeException =>
-        makeSurePersistentPathExists(path)
+        makeSurePersistentPathExists(path, acl)
         writeToZk
     }
   }
@@ -990,6 +1022,7 @@ class ZKCheckedEphemeral(path: String,
   private val getDataCallback = new GetDataCallback
   val latch: CountDownLatch = new CountDownLatch(1)
   var result: Code = Code.OK
+  val defaultAcls = ZkUtils.defaultAcls(isSecure, path)
 
   private class CreateCallback extends StringCallback {
     def processResult(rc: Int,
@@ -1051,7 +1084,7 @@ class ZKCheckedEphemeral(path: String,
   private def createEphemeral() {
     zkHandle.create(path,
                     ZKStringSerializer.serialize(data),
-                    DefaultAcls(isSecure),
+                    defaultAcls,
                     CreateMode.EPHEMERAL,
                     createCallback,
                     null)
@@ -1064,7 +1097,7 @@ class ZKCheckedEphemeral(path: String,
     } else {
       zkHandle.create(prefix,
                       new Array[Byte](0),
-                      DefaultAcls(isSecure),
+                      defaultAcls,
                       CreateMode.PERSISTENT,
                       new StringCallback() {
                         def processResult(rc : Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/08a5b642/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 3b4c48e..ba1becc 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -83,7 +83,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
         val aclList = zkUtils.zkConnection.getAcl(path).getKey
         assertTrue(aclList.size == 2)
         for (acl: ACL <- aclList.asScala) {
-          assertTrue(isAclSecure(acl))
+          assertTrue(isAclSecure(acl, false))
         }
       }
     }
@@ -158,7 +158,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
       zkUtils.makeSurePersistentPathExists(path)
       zkUtils.createPersistentPath(s"$path/fpjwashere", "")
     }
-    zkUtils.zkConnection.setAcl("/", zkUtils.DefaultAcls, -1)
+    zkUtils.zkConnection.setAcl("/", zkUtils.defaultAcls("/"), -1)
     deleteAllUnsecure()
   }
   
@@ -185,7 +185,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    */
   private def testMigration(zkUrl: String, firstZk: ZkUtils, secondZk: ZkUtils) {
     info(s"zkConnect string: $zkUrl")
-    for (path <- ZkUtils.SecureZkRootPaths) {
+    for (path <- ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths) {
       info(s"Creating $path")
       firstZk.makeSurePersistentPathExists(path)
       // Create a child for each znode to exercise the recurrent
@@ -206,39 +206,41 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging
{
       }
     ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
     info("Done with migration")
-    for (path <- ZkUtils.SecureZkRootPaths) {
+    for (path <- ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths) {
+      val sensitive = ZkUtils.sensitivePath(path)
       val listParent = secondZk.zkConnection.getAcl(path).getKey
-      assertTrue(path, isAclCorrect(listParent, secondZk.isSecure))
+      assertTrue(path, isAclCorrect(listParent, secondZk.isSecure, sensitive))
 
       val childPath = path + "/fpjwashere"
       val listChild = secondZk.zkConnection.getAcl(childPath).getKey
-      assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure))
+      assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure, sensitive))
     }
     // Check consumers path.
     val consumersAcl = firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath).getKey
-    assertTrue(ZkUtils.ConsumersPath, isAclCorrect(consumersAcl, false))
+    assertTrue(ZkUtils.ConsumersPath, isAclCorrect(consumersAcl, false, false))
   }
 
   /**
    * Verifies that the path has the appropriate secure ACL.
    */
   private def verify(path: String): Boolean = {
+    val sensitive = ZkUtils.sensitivePath(path)
     val list = zkUtils.zkConnection.getAcl(path).getKey
-    list.asScala.forall(isAclSecure)
+    list.asScala.forall(isAclSecure(_, sensitive))
   }
 
   /**
    * Verifies ACL.
    */
-  private def isAclCorrect(list: java.util.List[ACL], secure: Boolean): Boolean = {
+  private def isAclCorrect(list: java.util.List[ACL], secure: Boolean, sensitive: Boolean):
Boolean = {
     val isListSizeCorrect =
-      if (secure)
+      if (secure && !sensitive)
         list.size == 2
       else
         list.size == 1
     isListSizeCorrect && list.asScala.forall(
       if (secure)
-        isAclSecure
+        isAclSecure(_, sensitive)
       else
         isAclUnsecure
     )
@@ -249,10 +251,10 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging
{
    * values are based on the constants used in the 
    * ZooKeeper code base.
    */
-  private def isAclSecure(acl: ACL): Boolean = {
+  private def isAclSecure(acl: ACL, sensitive: Boolean): Boolean = {
     info(s"ACL $acl")
     acl.getPerms match {
-      case 1 => acl.getId.getScheme.equals("world")
+      case 1 => !sensitive && acl.getId.getScheme.equals("world")
       case 31 => acl.getId.getScheme.equals("sasl")
       case _ => false
     }


Mime
View raw message