kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2211: Adding simpleAclAuthorizer implementation and test cases.
Date Tue, 22 Sep 2015 00:47:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6ec88f7f8 -> 0990b6ba6


KAFKA-2211: Adding simpleAclAuthorizer implementation and test cases.

Author: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #195 from Parth-Brahmbhatt/KAFKA-2211


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0990b6ba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0990b6ba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0990b6ba

Branch: refs/heads/trunk
Commit: 0990b6ba6a28276f79a1a8bfaf48455c6eddfa1f
Parents: 6ec88f7
Author: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Authored: Mon Sep 21 17:47:18 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Sep 21 17:47:18 2015 -0700

----------------------------------------------------------------------
 .../common/requests/LeaderAndIsrRequest.java    |   2 +-
 config/log4j.properties                         |  11 +
 .../ZkNodeChangeNotificationListener.scala      | 129 +++++++++
 .../scala/kafka/network/RequestChannel.scala    |   2 +-
 .../main/scala/kafka/network/SocketServer.scala |   3 +-
 .../scala/kafka/security/auth/Authorizer.scala  |   4 +-
 .../security/auth/SimpleAclAuthorizer.scala     | 284 +++++++++++++++++++
 .../ZkNodeChangeNotificationListenerTest.scala  |  61 ++++
 .../security/auth/SimpleAclAuthorizerTest.scala | 233 +++++++++++++++
 9 files changed, 724 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 6b16496..002beef 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
-import java.nio.ByteBuffer;;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 public class LeaderAndIsrRequest extends AbstractRequest {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index c51ab8b..bf816e7 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -49,6 +49,12 @@ log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
 log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
+log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
+log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
 # Turn on all our debugging info
 #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
 #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
@@ -74,3 +80,8 @@ log4j.additivity.kafka.log.LogCleaner=false
 
 log4j.logger.state.change.logger=TRACE, stateChangeAppender
 log4j.additivity.state.change.logger=false
+
+#Change this to debug to get the actual audit log for authorizer.
+log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
+log4j.additivity.kafka.authorizer.logger=false
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
new file mode 100644
index 0000000..eb44c31
--- /dev/null
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.common
+
+import kafka.utils.{Time, SystemTime, ZkUtils, Logging}
+import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+import scala.collection.JavaConverters._
+
+/**
+ * Handle the notificationMessage.
+ */
+trait NotificationHandler {
+  def processNotification(notificationMessage: String)
+}
+
+/**
+ * A listener that subscribes to seqNodeRoot for any child changes where all children are
assumed to be sequence node
+ * with seqNodePrefix. When a child is added under seqNodeRoot this class gets notified,
it looks at lastExecutedChange
+ * number to avoid duplicate processing and if it finds an unprocessed child, it reads its
data and calls supplied
+ * notificationHandler's processNotification() method with the child's data as argument.
As part of processing these changes it also
+ * purges any children with currentTime - createTime > changeExpirationMs.
+ *
+ * The caller/user of this class should ensure that they use zkClient.subscribeStateChanges
and call processAllNotifications
+ * method of this class from ZkStateChangeListener's handleNewSession() method. This is necessary
to ensure that if zk session
+ * is terminated and reestablished any missed notification will be processed immediately.
+ * @param zkClient
+ * @param seqNodeRoot
+ * @param seqNodePrefix
+ * @param notificationHandler
+ * @param changeExpirationMs
+ * @param time
+ */
+class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
+                                       private val seqNodeRoot: String,
+                                       private val seqNodePrefix: String,
+                                       private val notificationHandler: NotificationHandler,
+                                       private val changeExpirationMs: Long = 15 * 60 * 1000,
+                                       private val time: Time = SystemTime) extends Logging
{
+  private var lastExecutedChange = -1L
+
+  /**
+   * create seqNodeRoot and begin watching for any new children nodes.
+   */
+  def init() {
+    ZkUtils.makeSurePersistentPathExists(zkClient, seqNodeRoot)
+    zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
+    processAllNotifications()
+  }
+
+  /**
+   * Process all changes
+   */
+  def processAllNotifications() {
+    val changes = zkClient.getChildren(seqNodeRoot)
+    processNotifications(changes.asScala.sorted)
+  }
+
+  /**
+   * Process the given list of notifications
+   */
+  private def processNotifications(notifications: Seq[String]) {
+    if (notifications.nonEmpty) {
+      info(s"Processing notification(s) to $seqNodeRoot")
+      val now = time.milliseconds
+      for (notification <- notifications) {
+        val changeId = changeNumber(notification)
+        if (changeId > lastExecutedChange) {
+          val changeZnode = seqNodeRoot + "/" + notification
+          val (data, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
+          data map (notificationHandler.processNotification(_)) getOrElse(logger.warn(s"read
null data from $changeZnode when processing notification $notification"))
+        }
+        lastExecutedChange = changeId
+      }
+      purgeObsoleteNotifications(now, notifications)
+    }
+  }
+
+  /**
+   * Purges expired notifications.
+   * @param now
+   * @param notifications
+   */
+  private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
+    for (notification <- notifications.sorted) {
+      val notificationNode = seqNodeRoot + "/" + notification
+      val (data, stat) = ZkUtils.readDataMaybeNull(zkClient, notificationNode)
+      if (data.isDefined) {
+        if (now - stat.getCtime > changeExpirationMs) {
+          debug(s"Purging change notification $notificationNode")
+          ZkUtils.deletePath(zkClient, notificationNode)
+        }
+      }
+    }
+  }
+
+  /* get the change number from a change notification znode */
+  private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong
+
+  /**
+   * A listener that gets invoked when a node is created to notify changes.
+   */
+  object NodeChangeListener extends IZkChildListener {
+    override def handleChildChange(path: String, notifications: java.util.List[String]) {
+      try {
+        import scala.collection.JavaConverters._
+        if (notifications != null)
+          processNotifications(notifications.asScala.sorted)
+      } catch {
+        case e: Exception => error(s"Error processing notification change for path = $path
and notification= $notifications :", e)
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 5d09a16..798e01d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -46,7 +46,7 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Session(principal: Principal, host: String)
+  case class Session(principal: KafkaPrincipal, host: String)
 
   case class Request(processor: Int, connectionId: String, session: Session, private var
buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
     @volatile var requestDequeueTimeMs = -1L

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d46603b..57ee318 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -33,6 +33,7 @@ import kafka.utils._
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.network.{ChannelBuilders, InvalidReceiveException, ChannelBuilder,
PlaintextChannelBuilder, SSLChannelBuilder}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -417,7 +418,7 @@ private[kafka] class Processor(val id: Int,
           try {
 
             val channel = selector.channelForId(receive.source);
-            val session = RequestChannel.Session(channel.principal, channel.socketDescription)
+            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
channel.principal().getName), channel.socketDescription)
             val req = RequestChannel.Request(processor = id, connectionId = receive.source,
session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol
= protocol)
             requestChannel.sendRequest(req)
           } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/main/scala/kafka/security/auth/Authorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 1f0547d..1569471 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -74,8 +74,8 @@ trait Authorizer extends Configurable {
   /**
    * get the acls for this principal.
    * @param principal
-   * @return empty set if no acls exist for this principal, otherwise the acls for the principal.
+   * @return empty Map if no acls exist for this principal, otherwise a map of resource ->
acls for the principal.
    */
-  def getAcls(principal: KafkaPrincipal): Set[Acl]
+  def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
new file mode 100644
index 0000000..8457cb8
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import java.util
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+
+import kafka.network.RequestChannel.Session
+import kafka.server.KafkaConfig
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import kafka.utils._
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import scala.collection.JavaConverters._
+import org.apache.log4j.Logger
+
+object SimpleAclAuthorizer {
+  //optional override zookeeper cluster configuration where acls will be stored, if not specified
acls will be stored in
+  //same zookeeper where all other kafka broker info is stored.
+  val ZkUrlProp = "authorizer.zookeeper.url"
+  val ZkConnectionTimeOutProp = "authorizer.zookeeper.connection.timeout.ms"
+  val ZkSessionTimeOutProp = "authorizer.zookeeper.session.timeout.ms"
+
+  //List of users that will be treated as super users and will have access to all the resources
for all actions from all hosts, defaults to no super users.
+  val SuperUsersProp = "super.users"
+  //If set to true when no acls are found for a resource , authorizer allows access to everyone.
Defaults to false.
+  val AllowEveryoneIfNoAclIsFoundProp = "allow.everyone.if.no.acl.found"
+
+  /**
+   * The root acl storage node. Under this node there will be one child node per resource
type (Topic, Cluster, ConsumerGroup).
+   * under each resourceType there will be a unique child for each resource instance and
the data for that child will contain
+   * list of its acls as a json object. Following gives an example:
+   *
+   * <pre>
+   * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType":
"Allow","operation": "Read","principal": "User:alice"}]}
+   * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType":
"Allow","operation": "Read","principal": "User:alice"}]}
+   * /kafka-acl/ConsumerGroup/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType":
"Allow","operation": "Read","principal": "User:alice"}]}
+   * </pre>
+   */
+  val AclZkPath = "/kafka-acl"
+
+  //notification node which gets updated with the resource name when acl on a resource is
changed.
+  val AclChangedZkPath = "/kafka-acl-changes"
+
+  //prefix of all the change notificiation sequence node.
+  val AclChangedPrefix = "acl_changes_"
+}
+
+class SimpleAclAuthorizer extends Authorizer with Logging {
+  private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger")
+  private var superUsers = Set.empty[KafkaPrincipal]
+  private var shouldAllowEveryoneIfNoAclIsFound = false
+  private var zkClient: ZkClient = null
+  private var aclChangeListener: ZkNodeChangeNotificationListener = null
+
+  private val aclCache = new scala.collection.mutable.HashMap[Resource, Set[Acl]]
+  private val lock = new ReentrantReadWriteLock()
+
+  /**
+   * Guaranteed to be called before any authorize call is made.
+   */
+  override def configure(javaConfigs: util.Map[String, _]) {
+    val configs = javaConfigs.asScala
+    val props = new java.util.Properties()
+    configs foreach { case (key, value) => props.put(key, value.toString) }
+    val kafkaConfig = KafkaConfig.fromProps(props)
+
+    superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
+      case str: String if str.nonEmpty => str.split(",").map(s => KafkaPrincipal.fromString(s.trim)).toSet
+    }.getOrElse(Set.empty[KafkaPrincipal])
+
+    shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).map(_.toString.toBoolean).getOrElse(false)
+
+    val zkUrl = configs.getOrElse(SimpleAclAuthorizer.ZkUrlProp, kafkaConfig.zkConnect).toString
+    val zkConnectionTimeoutMs = configs.getOrElse(SimpleAclAuthorizer.ZkConnectionTimeOutProp,
kafkaConfig.zkConnectionTimeoutMs).toString.toInt
+    val zkSessionTimeOutMs = configs.getOrElse(SimpleAclAuthorizer.ZkSessionTimeOutProp,
kafkaConfig.zkSessionTimeoutMs).toString.toInt
+
+    zkClient = ZkUtils.createZkClient(zkUrl, zkConnectionTimeoutMs, zkSessionTimeOutMs)
+    ZkUtils.makeSurePersistentPathExists(zkClient, SimpleAclAuthorizer.AclZkPath)
+
+    loadCache()
+
+    ZkUtils.makeSurePersistentPathExists(zkClient, SimpleAclAuthorizer.AclChangedZkPath)
+    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, SimpleAclAuthorizer.AclChangedZkPath,
SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler)
+    aclChangeListener.init()
+
+    zkClient.subscribeStateChanges(ZkStateChangeListener)
+  }
+
+  override def authorize(session: Session, operation: Operation, resource: Resource): Boolean
= {
+    val principal: KafkaPrincipal = session.principal
+    val host = session.host
+    val acls = getAcls(resource)
+
+    //check if there is any Deny acl match that would disallow this operation.
+    val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)
+
+    //if principal is allowed to read or write we allow describe by default, the reverse
does not apply to Deny.
+    val ops = if (Describe == operation)
+      Set[Operation](operation, Read, Write)
+    else
+      Set[Operation](operation)
+
+    //now check if there is any allow acl that will allow this operation.
+    val allowMatch = ops.exists(operation => aclMatch(session, operation, resource, principal,
host, Allow, acls))
+
+    //we allow an operation if a user is a super user or if no acls are found and user has
configured to allow all users
+    //when no acls are found or if no deny acls are found and at least one allow acls matches.
+    val authorized = isSuperUser(operation, resource, principal, host) ||
+      isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
+      (!denyMatch && allowMatch)
+
+    logAuditMessage(principal, authorized, operation, resource, host)
+    authorized
+  }
+
+  def isEmptyAclAndAuthorized(operation: Operation, resource: Resource, principal: KafkaPrincipal,
host: String, acls: Set[Acl]): Boolean = {
+    if (acls.isEmpty) {
+      authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
+      shouldAllowEveryoneIfNoAclIsFound
+    } else false
+  }
+
+  def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host:
String): Boolean = {
+    if (superUsers.exists( _ == principal)) {
+      authorizerLogger.debug(s"principal = $principal is a super user, allowing operation
without checking acls.")
+      true
+    } else false
+  }
+
+  private def aclMatch(session: Session, operations: Operation, resource: Resource, principal:
KafkaPrincipal, host: String, permissionType: PermissionType, acls: Set[Acl]): Boolean = {
+    acls.find ( acl =>
+      acl.permissionType == permissionType
+        && (acl.principal == principal || acl.principal == Acl.WildCardPrincipal)
+        && (operations == acl.operation || acl.operation == All)
+        && (acl.host == host || acl.host == Acl.WildCardHost)
+    ).map { acl: Acl =>
+      authorizerLogger.debug(s"operation = $operations on resource = $resource from host
= $host is $permissionType based on acl = $acl")
+      true
+    }.getOrElse(false)
+  }
+
+  override def addAcls(acls: Set[Acl], resource: Resource) {
+    if (acls != null && acls.nonEmpty) {
+      val updatedAcls = getAcls(resource) ++ acls
+      val path = toResourcePath(resource)
+
+      if (ZkUtils.pathExists(zkClient, path))
+        ZkUtils.updatePersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
+      else
+        ZkUtils.createPersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
+
+      updateAclChangedFlag(resource)
+      updateCache(resource, updatedAcls)
+    }
+  }
+
+  override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = {
+    if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) {
+      val existingAcls = getAcls(resource)
+      val filteredAcls = existingAcls.filter((acl: Acl) => !aclsTobeRemoved.contains(acl))
+
+      val aclNeedsRemoval = (existingAcls != filteredAcls)
+      if (aclNeedsRemoval) {
+        val path: String = toResourcePath(resource)
+        if (filteredAcls.nonEmpty)
+          ZkUtils.updatePersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls)))
+        else
+          ZkUtils.deletePath(zkClient, toResourcePath(resource))
+
+        updateAclChangedFlag(resource)
+        updateCache(resource, filteredAcls)
+      }
+
+      aclNeedsRemoval
+    } else false
+  }
+
+  override def removeAcls(resource: Resource): Boolean = {
+    if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) {
+      ZkUtils.deletePath(zkClient, toResourcePath(resource))
+      updateAclChangedFlag(resource)
+      updateCache(resource, Set.empty[Acl])
+      true
+    } else false
+  }
+
+  override def getAcls(resource: Resource): Set[Acl] = {
+    inReadLock(lock) {
+      aclCache.get(resource).getOrElse(Set.empty[Acl])
+    }
+  }
+
+  private def getAclsFromZk(resource: Resource): Set[Acl] = {
+    val aclJson = ZkUtils.readDataMaybeNull(zkClient, toResourcePath(resource))._1
+    aclJson.map(Acl.fromJson).getOrElse(Set.empty)
+  }
+
+  override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
+    aclCache.mapValues { acls =>
+      acls.filter(_.principal == principal)
+    }.filter { case (_, acls) =>
+      acls.nonEmpty
+    }.toMap
+  }
+
+  private def loadCache()  {
+    var acls = Set.empty[Acl]
+    val resourceTypes = ZkUtils.getChildren(zkClient, SimpleAclAuthorizer.AclZkPath)
+    for (rType <- resourceTypes) {
+      val resourceType = ResourceType.fromString(rType)
+      val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
+      val resourceNames = ZkUtils.getChildren(zkClient, resourceTypePath)
+      for (resourceName <- resourceNames) {
+        acls = getAclsFromZk(Resource(resourceType, resourceName.toString))
+        updateCache(new Resource(resourceType, resourceName), acls)
+      }
+    }
+  }
+
+  private def updateCache(resource: Resource, acls: Set[Acl]) {
+    inWriteLock(lock) {
+      if (acls.nonEmpty)
+        aclCache.put(resource, acls)
+      else
+        aclCache.remove(resource)
+    }
+  }
+
+  def toResourcePath(resource: Resource): String = {
+    SimpleAclAuthorizer.AclZkPath + "/" + resource.resourceType + "/" + resource.name
+  }
+
+  private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation:
Operation, resource: Resource, host: String) {
+    val permissionType = if (authorized) "Allowed" else "Denied"
+    authorizerLogger.debug(s"Principal = $principal is $permissionType Operation = $operation
from host = $host on resource = $resource")
+  }
+
+  private def updateAclChangedFlag(resource: Resource) {
+    ZkUtils.createSequentialPersistentPath(zkClient, SimpleAclAuthorizer.AclChangedZkPath
+ "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
+  }
+
+  object AclChangedNotificaitonHandler extends NotificationHandler {
+
+    override def processNotification(notificationMessage: String) {
+      val resource: Resource = Resource.fromString(notificationMessage)
+      val acls = getAclsFromZk(resource)
+      updateCache(resource, acls)
+    }
+  }
+
+  object ZkStateChangeListener extends IZkStateListener {
+
+    override def handleNewSession() {
+      aclChangeListener.processAllNotifications
+    }
+
+    override def handleSessionEstablishmentError(error: Throwable) {
+      fatal("Could not establish session with zookeeper", error)
+    }
+
+    override def handleStateChanged(state: KeeperState) {
+      //no op
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
new file mode 100644
index 0000000..d43778e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.common
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestUtils, ZkUtils}
+import org.junit.Test
+
+class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
+
+  override def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0,
zkConnect)))
+
+  @Test
+  def testProcessNotification() {
+    val notificationHandler = new NotificationHandler {
+      @volatile var notification: String = _
+      @volatile var invocationCount: Integer = 0
+      override def processNotification(notificationMessage: String): Unit = {
+        notification = notificationMessage
+        invocationCount += 1
+      }
+    }
+
+    val seqNodeRoot = "/root"
+    val seqNodePrefix = "prefix"
+    val seqNodePath = seqNodeRoot + "/" + seqNodePrefix
+    val notificationMessage1 = "message1"
+    val notificationMessage2 = "message2"
+    val changeExpirationMs = 100
+
+    val notificationListener = new ZkNodeChangeNotificationListener(zkClient, seqNodeRoot,
seqNodePrefix, notificationHandler, changeExpirationMs)
+    notificationListener.init()
+
+    ZkUtils.createSequentialPersistentPath(zkClient, seqNodePath, notificationMessage1)
+
+    TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 1 &&
notificationHandler.notification == notificationMessage1, "failed to send/process notification
message in the timeout period.")
+
+    /*There is no easy way to test that purging. Even if we mock kafka time with MockTime,
the purging compares kafka time with the time stored in zookeeper stat and the
+    embeded zookeeper server does not provide a way to mock time. so to test purging we will
have to use SystemTime.sleep(changeExpirationMs + 1) issue a write and check
+    Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size) however even
after that the assertion can fail as the second node it self can be deleted
+    depending on how threads get scheduled.*/
+
+    ZkUtils.createSequentialPersistentPath(zkClient, seqNodePath, notificationMessage2)
+    TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 2 &&
notificationHandler.notification == notificationMessage2, "failed to send/process notification
message in the timeout period.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
new file mode 100644
index 0000000..d3efc36
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import java.util.UUID
+
+import kafka.network.RequestChannel.Session
+import kafka.security.auth.Acl.WildCardHost
+import kafka.server.KafkaConfig
+import kafka.utils.{ZkUtils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
+
+  var simpleAclAuthorizer = new SimpleAclAuthorizer
+  val testPrincipal = Acl.WildCardPrincipal
+  val testHostName = "test.host.com"
+  var session = new Session(testPrincipal, testHostName)
+  var resource: Resource = null
+  val superUsers = "User:superuser1, User:superuser2"
+  val username = "alice"
+  var config: KafkaConfig = null
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    val props = TestUtils.createBrokerConfig(0, zkConnect)
+    props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
+
+    config = KafkaConfig.fromProps(props)
+    simpleAclAuthorizer.configure(config.originals)
+    resource = new Resource(Topic, UUID.randomUUID().toString)
+  }
+
+  @Test
+  def testTopicAcl() {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
+    val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
+    val host1 = "host1"
+    val host2 = "host2"
+
+    //user1 has READ access from host1 and host2.
+    val acl1 = new Acl(user1, Allow, host1, Read)
+    val acl2 = new Acl(user1, Allow, host2, Read)
+
+    //user1 does not have  READ access from host1.
+    val acl3 = new Acl(user1, Deny, host1, Read)
+
+    //user1 has Write access from host1 only.
+    val acl4 = new Acl(user1, Allow, host1, Write)
+
+    //user1 has DESCRIBE access from all hosts.
+    val acl5 = new Acl(user1, Allow, WildCardHost, Describe)
+
+    //user2 has READ access from all hosts.
+    val acl6 = new Acl(user2, Allow, WildCardHost, Read)
+
+    //user3 has WRITE access from all hosts.
+    val acl7 = new Acl(user3, Allow, WildCardHost, Write)
+
+    val acls = Set[Acl](acl1, acl2, acl3, acl4, acl5, acl6, acl7)
+
+    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
+
+    val host1Session = new Session(user1, host1)
+    val host2Session = new Session(user1, host2)
+
+    assertTrue("User1 should have READ access from host2", simpleAclAuthorizer.authorize(host2Session,
Read, resource))
+    assertFalse("User1 should not have READ access from host1 due to denyAcl", simpleAclAuthorizer.authorize(host1Session,
Read, resource))
+    assertTrue("User1 should have WRITE access from host1", simpleAclAuthorizer.authorize(host1Session,
Write, resource))
+    assertFalse("User1 should not have WRITE access from host2 as no allow acl is defined",
simpleAclAuthorizer.authorize(host2Session, Write, resource))
+    assertTrue("User1 should not have DESCRIBE access from host1", simpleAclAuthorizer.authorize(host1Session,
Describe, resource))
+    assertTrue("User1 should have DESCRIBE access from host2", simpleAclAuthorizer.authorize(host2Session,
Describe, resource))
+    assertFalse("User1 should not have edit access from host1", simpleAclAuthorizer.authorize(host1Session,
Alter, resource))
+    assertFalse("User1 should not have edit access from host2", simpleAclAuthorizer.authorize(host2Session,
Alter, resource))
+
+    //test if user has READ and write access they also get describe access
+    val user2Session = new Session(user2, host1)
+    val user3Session = new Session(user3, host1)
+    assertTrue("User2 should have DESCRIBE access from host1", simpleAclAuthorizer.authorize(user2Session,
Describe, resource))
+    assertTrue("User3 should have DESCRIBE access from host2", simpleAclAuthorizer.authorize(user3Session,
Describe, resource))
+    assertTrue("User2 should have READ access from host1", simpleAclAuthorizer.authorize(user2Session,
Read, resource))
+    assertTrue("User3 should have WRITE access from host2", simpleAclAuthorizer.authorize(user3Session,
Write, resource))
+  }
+
+  @Test
+  def testDenyTakesPrecedence() {
+    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val host = "random-host"
+    val session = new Session(user, host)
+
+    val allowAll = Acl.AllowAllAcl
+    val denyAcl = new Acl(user, Deny, host, All)
+    val acls = Set[Acl](allowAll, denyAcl)
+
+    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
+
+    assertFalse("deny should take precedence over allow.", simpleAclAuthorizer.authorize(session,
Read, resource))
+  }
+
+  @Test
+  def testAllowAllAccess() {
+    val allowAllAcl = Acl.AllowAllAcl
+
+    changeAclAndVerify(Set.empty[Acl], Set[Acl](allowAllAcl), Set.empty[Acl])
+
+    val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), "random.host")
+    assertTrue("allow all acl should allow access to all.", simpleAclAuthorizer.authorize(session,
Read, resource))
+  }
+
+  @Test
+  def testSuperUserHasAccess() {
+    val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
+
+    changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
+
+    val session1 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"),
"random.host")
+    val session2 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"),
"random.host")
+
+    assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session1,
Read, resource))
+    assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session2,
Read, resource))
+  }
+
+  @Test
+  def testNoAclFound() {
+    assertFalse("when acls = [],  authorizer should fail close.", simpleAclAuthorizer.authorize(session,
Read, resource))
+  }
+
+  @Test
+  def testNoAclFoundOverride() {
+    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+
+    val cfg = KafkaConfig.fromProps(props)
+    val testAuthoizer: SimpleAclAuthorizer = new SimpleAclAuthorizer
+    testAuthoizer.configure(cfg.originals)
+    assertTrue("when acls = null or [],  authorizer should fail open with allow.everyone
= true.", testAuthoizer.authorize(session, Read, resource))
+  }
+
+  @Test
+  def testAclManagementAPIs() {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
+    val host1 = "host1"
+    val host2 = "host2"
+
+    val acl1 = new Acl(user1, Allow, host1, Read)
+    val acl2 = new Acl(user1, Allow, host1, Write)
+    val acl3 = new Acl(user2, Allow, host2, Read)
+    val acl4 = new Acl(user2, Allow, host2, Write)
+
+    var acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](acl1, acl2, acl3, acl4), Set.empty[Acl])
+
+    //test addAcl is additive
+    val acl5 = new Acl(user2, Allow, WildCardHost, Read)
+    acls = changeAclAndVerify(acls, Set[Acl](acl5), Set.empty[Acl])
+
+    //test get by principal name.
+    TestUtils.waitUntilTrue(() => Map(resource -> Set(acl1, acl2)) == simpleAclAuthorizer.getAcls(user1),
"changes not propogated in timeout period")
+    TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2),
"changes not propogated in timeout period")
+
+    //test remove acl from existing acls.
+    changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
+
+    //test remove all acls for resource
+    simpleAclAuthorizer.removeAcls(resource)
+    TestUtils.waitUntilTrue(() => simpleAclAuthorizer.getAcls(resource) == Set.empty[Acl],
"changes not propagated in timeout period.")
+    assertTrue(!ZkUtils.pathExists(zkClient, simpleAclAuthorizer.toResourcePath(resource)))
+
+    //test removing last acl also deletes zookeeper path
+    acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
+    changeAclAndVerify(Set.empty[Acl], Set.empty[Acl], acls)
+    assertTrue(!ZkUtils.pathExists(zkClient, simpleAclAuthorizer.toResourcePath(resource)))
+  }
+
+  @Test
+  def testLoadCache() {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val acl1 = new Acl(user1, Allow, "host-1", Read)
+    val acls = Set[Acl](acl1)
+    simpleAclAuthorizer.addAcls(acls, resource)
+
+    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
+    val resource1 = new Resource(Topic, "test-2")
+    val acl2 = new Acl(user2, Deny, "host3", Read)
+    val acls1 = Set[Acl](acl2)
+    simpleAclAuthorizer.addAcls(acls1, resource1)
+
+    ZkUtils.deletePathRecursive(zkClient, SimpleAclAuthorizer.AclChangedZkPath)
+    val authorizer = new SimpleAclAuthorizer
+    authorizer.configure(config.originals)
+
+    assertEquals(acls, authorizer.getAcls(resource))
+    assertEquals(acls1, authorizer.getAcls(resource1))
+  }
+
+  private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls:
Set[Acl]): Set[Acl] = {
+    var acls = originalAcls
+
+    if(addedAcls.nonEmpty) {
+      simpleAclAuthorizer.addAcls(addedAcls, resource)
+      acls ++= addedAcls
+    }
+
+    if(removedAcls.nonEmpty) {
+      simpleAclAuthorizer.removeAcls(removedAcls, resource)
+      acls --=removedAcls
+    }
+
+    TestUtils.waitUntilTrue(() => simpleAclAuthorizer.getAcls(resource) == acls, "changes
not propagated in timeout period.")
+
+    acls
+  }
+}
\ No newline at end of file


Mime
View raw message