kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2212: Authorizer CLI implementation.
Date Fri, 02 Oct 2015 01:04:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d03b871dd -> 5764e54de


KAFKA-2212: Authorizer CLI implementation.

Author: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #230 from Parth-Brahmbhatt/KAFKA-2212


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5764e54d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5764e54d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5764e54d

Branch: refs/heads/trunk
Commit: 5764e54de147af81aac85acc00687c23e9646a5c
Parents: d03b871
Author: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Authored: Thu Oct 1 18:04:09 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Oct 1 18:04:09 2015 -0700

----------------------------------------------------------------------
 bin/kafka-acls.sh                               |  17 +
 bin/windows/kafka-acls.bat                      |  17 +
 .../src/main/scala/kafka/admin/AclCommand.scala | 350 +++++++++++++++++++
 core/src/main/scala/kafka/common/BaseEnum.scala |  26 ++
 .../scala/kafka/security/auth/Authorizer.scala  |   5 +
 .../scala/kafka/security/auth/Operation.scala   |   4 +-
 .../kafka/security/auth/PermissionType.scala    |   6 +-
 .../scala/kafka/security/auth/Resource.scala    |   1 +
 .../kafka/security/auth/ResourceType.scala      |   8 +-
 .../security/auth/SimpleAclAuthorizer.scala     |   6 +-
 .../scala/kafka/utils/CommandLineUtils.scala    |   6 +-
 .../scala/unit/kafka/admin/AclCommandTest.scala | 134 +++++++
 .../security/auth/SimpleAclAuthorizerTest.scala |  43 ++-
 13 files changed, 604 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/bin/kafka-acls.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-acls.sh b/bin/kafka-acls.sh
new file mode 100755
index 0000000..fd0fb67
--- /dev/null
+++ b/bin/kafka-acls.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/bin/windows/kafka-acls.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-acls.bat b/bin/windows/kafka-acls.bat
new file mode 100644
index 0000000..7a78ae8
--- /dev/null
+++ b/bin/windows/kafka-acls.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+%~dp0kafka-run-class.bat kafka.admin.AclCommand %*

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
new file mode 100644
index 0000000..6a8a8a2
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple._
+import kafka.security.auth._
+import kafka.utils._
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.JavaConverters._
+
+object AclCommand {
+
+  val Delimiter = ','
+  val Newline = scala.util.Properties.lineSeparator
+  val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
+    Topic -> Set(Read, Write, Describe),
+    ConsumerGroup -> Set(Read),
+    Cluster -> Set(Create, ClusterAction)
+  )
+
+  def main(args: Array[String]) {
+
+    val opts = new AclCommandOptions(args)
+
+    if (opts.options.has(opts.helpOpt))
+      CommandLineUtils.printUsageAndDie(opts.parser, "Usage:")
+
+    opts.checkArgs()
+
+    var authorizerProperties = Map.empty[String, Any]
+    if (opts.options.has(opts.authorizerPropertiesOpt)) {
+      val props = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala.map(_.split("="))
+      props.foreach(pair => authorizerProperties += (pair(0).trim -> pair(1).trim))
+    }
+
+    val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
+    val authZ: Authorizer = CoreUtils.createObject(authorizerClass)
+    authZ.configure(authorizerProperties.asJava)
+
+    try {
+      if (opts.options.has(opts.addOpt))
+        addAcl(authZ, opts)
+      else if (opts.options.has(opts.removeOpt))
+        removeAcl(authZ, opts)
+      else if (opts.options.has(opts.listOpt))
+        listAcl(authZ, opts)
+    } catch {
+      case e: Throwable =>
+        println(s"Error while executing topic Acl command ${e.getMessage}")
+        println(Utils.stackTrace(e))
+        System.exit(-1)
+    }
+  }
+
+  private def addAcl(authZ: Authorizer, opts: AclCommandOptions) {
+    val resourceToAcl = getResourceToAcls(opts)
+
+    if (resourceToAcl.values.exists(_.isEmpty))
+      CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principals,
--deny-principals when trying to add acls.")
+
+    for ((resource, acls) <- resourceToAcl) {
+      val acls = resourceToAcl(resource)
+      println(s"Adding following acls for resource: $resource $Newline ${acls.map("\t" +
_).mkString(Newline)} $Newline")
+      authZ.addAcls(acls, resource)
+    }
+
+    listAcl(authZ, opts)
+  }
+
+  private def removeAcl(authZ: Authorizer, opts: AclCommandOptions) {
+    val resourceToAcl = getResourceToAcls(opts)
+
+    for ((resource, acls) <- resourceToAcl) {
+      if (acls.isEmpty) {
+        if (confirmAction(s"Are you sure you want to delete all acls for resource: $resource
y/n?"))
+          authZ.removeAcls(resource)
+      } else {
+        if (confirmAction(s"Are you sure you want to remove acls: $Newline ${acls.map("\t"
+ _).mkString(Newline)} $Newline from resource $resource y/n?"))
+          authZ.removeAcls(acls, resource)
+      }
+    }
+
+    listAcl(authZ, opts)
+  }
+
+  private def listAcl(authZ: Authorizer, opts: AclCommandOptions) {
+    val resources = getResource(opts, dieIfNoResourceFound = false)
+
+    val resourceToAcls = if(resources.isEmpty)
+      authZ.getAcls()
+    else
+      resources.map(resource => (resource -> authZ.getAcls(resource)))
+
+    for ((resource, acls) <- resourceToAcls)
+      println(s"Following is list of acls for resource: $resource $Newline ${acls.map("\t"
+ _).mkString(Newline)} $Newline")
+  }
+
+  private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
+    var resourceToAcls = Map.empty[Resource, Set[Acl]]
+
+    //if none of the --producer or --consumer options are specified , just construct acls
from CLI options.
+    if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt))
{
+      resourceToAcls ++= getCliResourceToAcls(opts)
+    }
+
+    //users are allowed to specify both --producer and --consumer options in a single command.
+    if (opts.options.has(opts.producerOpt))
+      resourceToAcls ++= getProducerResourceToAcls(opts)
+
+    if (opts.options.has(opts.consumerOpt))
+      resourceToAcls ++= getConsumerResourceToAcls(opts).map { case (k, v) => k ->
(v ++ resourceToAcls.getOrElse(k, Set.empty[Acl])) }
+
+    validateOperation(opts, resourceToAcls)
+
+    resourceToAcls
+  }
+
+  private def getProducerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]]
= {
+    val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
+
+    val acls = getAcl(opts, Set(Write, Describe))
+
+    //Write, Describe permission on topics, Create permission on cluster
+    topics.map(_ -> acls).toMap[Resource, Set[Acl]] +
+      (Resource.ClusterResource -> getAcl(opts, Set(Create)))
+  }
+
+  private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]]
= {
+    val resources = getResource(opts)
+
+    val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
+    val consumerGroups: Set[Resource] = resources.filter(_.resourceType == ConsumerGroup)
+
+    //Read,Describe on topic, Read on consumerGroup + Create on cluster
+
+    val acls = getAcl(opts, Set(Read, Describe))
+
+    topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++
+      consumerGroups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]]
+  }
+
+  private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
+    val acls = getAcl(opts)
+    val resources = getResource(opts)
+    resources.map(_ -> acls).toMap
+  }
+
+  private def getAcl(opts: AclCommandOptions, operations: Set[Operation]): Set[Acl] = {
+    val allowedPrincipals = getPrincipals(opts, opts.allowPrincipalsOpt)
+
+    val deniedPrincipals = getPrincipals(opts, opts.denyPrincipalsOpt)
+
+    val allowedHosts = getHosts(opts, opts.allowHostsOpt, opts.allowPrincipalsOpt)
+
+    val deniedHosts = getHosts(opts, opts.denyHostssOpt, opts.denyPrincipalsOpt)
+
+    val acls = new collection.mutable.HashSet[Acl]
+    if (allowedHosts.nonEmpty && allowedPrincipals.nonEmpty)
+      acls ++= getAcls(allowedPrincipals, Allow, operations, allowedHosts)
+
+    if (deniedHosts.nonEmpty && deniedPrincipals.nonEmpty)
+      acls ++= getAcls(deniedPrincipals, Deny, operations, deniedHosts)
+
+    acls.toSet
+  }
+
+  private def getAcl(opts: AclCommandOptions): Set[Acl] = {
+    val operations = opts.options.valuesOf(opts.operationsOpt).asScala.map(operation =>
Operation.fromString(operation.trim)).toSet
+    getAcl(opts, operations)
+  }
+
+  def getAcls(principals: Set[KafkaPrincipal], permissionType: PermissionType, operations:
Set[Operation],
+              hosts: Set[String]): Set[Acl] = {
+    for {
+      principal <- principals
+      operation <- operations
+      host <- hosts
+    } yield new Acl(principal, permissionType, host, operation)
+  }
+
+  private def getHosts(opts: AclCommandOptions, hostOptionSpec: ArgumentAcceptingOptionSpec[String],
+                       principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[String]
= {
+    if (opts.options.has(hostOptionSpec))
+      opts.options.valuesOf(hostOptionSpec).asScala.map(_.trim).toSet
+    else if (opts.options.has(principalOptionSpec))
+      Set[String](Acl.WildCardHost)
+    else
+      Set.empty[String]
+  }
+
+  private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]):
Set[KafkaPrincipal] = {
+    if (opts.options.has(principalOptionSpec))
+      opts.options.valuesOf(principalOptionSpec).asScala.map(s => KafkaPrincipal.fromString(s.trim)).toSet
+    else
+      Set.empty[KafkaPrincipal]
+  }
+
+  private def getResource(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true):
Set[Resource] = {
+    var resources = Set.empty[Resource]
+    if (opts.options.has(opts.topicOpt))
+      opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resources += new Resource(Topic,
topic.trim))
+
+    if (opts.options.has(opts.clusterOpt))
+      resources += Resource.ClusterResource
+
+    if (opts.options.has(opts.groupOpt))
+      opts.options.valuesOf(opts.groupOpt).asScala.foreach(consumerGroup => resources
+= new Resource(ConsumerGroup, consumerGroup.trim))
+
+    if (resources.isEmpty && dieIfNoResourceFound)
+      CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource:
--topic <topic> or --cluster or --consumer-group <group>")
+
+    resources
+  }
+
+  private def confirmAction(msg: String): Boolean = {
+    println(msg)
+    Console.readLine().equalsIgnoreCase("y")
+  }
+
+  private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[Resource, Set[Acl]])
= {
+    for ((resource, acls) <- resourceToAcls) {
+      val validOps = ResourceTypeToValidOperations(resource.resourceType)
+      if ((acls.map(_.operation) -- validOps).nonEmpty)
+        CommandLineUtils.printUsageAndDie(opts.parser, s"ResourceType ${resource.resourceType}
only supports operations ${validOps.mkString(Delimiter.toString)}")
+    }
+  }
+
+  class AclCommandOptions(args: Array[String]) {
+    val parser = new OptionParser
+    val authorizerOpt = parser.accepts("authorizer", "Fully qualified class name of the authorizer,
defaults to kafka.security.auth.SimpleAclAuthorizer.")
+      .withRequiredArg
+      .describedAs("authorizer")
+      .ofType(classOf[String])
+      .defaultsTo(classOf[SimpleAclAuthorizer].getName)
+
+    val authorizerPropertiesOpt = parser.accepts("authorizer-properties", "REQUIRED: properties
required to configure an instance of Authorizer. " +
+      "These are comma separated key=val pairs. For the default authorizer the example values
are: " +
+      "zookeeper.connect=localhost:2181")
+      .withRequiredArg
+      .describedAs("authorizer-properties")
+      .ofType(classOf[String])
+      .withValuesSeparatedBy(Delimiter)
+
+    val topicOpt = parser.accepts("topic", "Comma separated list of topic to which acls should
be added or removed. " +
+      "A value of * indicates acl should apply to all topics.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
+      .withValuesSeparatedBy(Delimiter)
+
+    val clusterOpt = parser.accepts("cluster", "Add/Remove cluster acls.")
+    val groupOpt = parser.accepts("consumer-group", "Comma separated list of consumer groups
to which the acls should be added or removed. " +
+      "A value of * indicates the acls should apply to all consumer-groups.")
+      .withRequiredArg
+      .describedAs("consumer-group")
+      .ofType(classOf[String])
+      .withValuesSeparatedBy(Delimiter)
+
+    val addOpt = parser.accepts("add", "Indicates you are trying to add acls.")
+    val removeOpt = parser.accepts("remove", "Indicates you are trying to remove acls.")
+    val listOpt = parser.accepts("list", "List acls for the specified resource, use --topic
<topic> or --consumer-group <group> or --cluster to specify a resource.")
+
+    val operationsOpt = parser.accepts("operations", "Comma separated list of operations,
default is All. Valid operation names are: " + Newline +
+      Operation.values.map("\t" + _).mkString(Newline) + Newline)
+      .withRequiredArg
+      .ofType(classOf[String])
+      .defaultsTo(All.name)
+      .withValuesSeparatedBy(Delimiter)
+
+    val allowPrincipalsOpt = parser.accepts("allow-principals", "Comma separated list of
principals where principal is in principalType:name format." +
+      " User:* is the wild card indicating all users.")
+      .withRequiredArg
+      .describedAs("allow-principals")
+      .ofType(classOf[String])
+      .withValuesSeparatedBy(Delimiter)
+
+    val denyPrincipalsOpt = parser.accepts("deny-principals", "Comma separated list of principals
where principal is in " +
+      "principalType: name format. By default anyone not in --allow-principals list is denied
access. " +
+      "You only need to use this option as negation to already allowed set. " +
+      "For example if you wanted to allow access to all users in the system but not test-user
you can define an acl that " +
+      "allows access to User:* and specify --deny-principals=User:test@EXAMPLE.COM. " +
+      "AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.")
+      .withRequiredArg
+      .describedAs("deny-principals")
+      .ofType(classOf[String])
+      .withValuesSeparatedBy(Delimiter)
+
+    val allowHostsOpt = parser.accepts("allow-hosts", "Comma separated list of hosts from
which principals listed in --allow-principals will have access. " +
+      "If you have specified --allow-principals then the default for this option will be
set to * which allows access from all hosts.")
+      .withRequiredArg
+      .describedAs("allow-hosts")
+      .ofType(classOf[String])
+      .withValuesSeparatedBy(Delimiter)
+
+    val denyHostssOpt = parser.accepts("deny-hosts", "Comma separated list of hosts from
which principals listed in --deny-principals will be denied access. " +
+      "If you have specified --deny-principals then the default for this option will be set
to * which denies access from all hosts.")
+      .withRequiredArg
+      .describedAs("deny-hosts")
+      .ofType(classOf[String])
+      .withValuesSeparatedBy(Delimiter)
+
+    val producerOpt = parser.accepts("producer", "Convenience option to add/remove acls for
producer role. " +
+      "This will generate acls that allows WRITE,DESCRIBE on topic and CREATE on cluster.
")
+
+    val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove acls for
consumer role. " +
+      "This will generate acls that allows READ,DESCRIBE on topic and READ on consumer-group.")
+
+    val helpOpt = parser.accepts("help", "Print usage information.")
+
+    val options = parser.parse(args: _*)
+
+    def checkArgs() {
+      CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt)
+
+      val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
+      if (actions != 1)
+        CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action:
--list, --add, --remove. ")
+
+      CommandLineUtils.checkInvalidArgs(parser, options, listOpt, Set(producerOpt, consumerOpt,
allowHostsOpt, allowPrincipalsOpt, denyHostssOpt, denyPrincipalsOpt))
+
+      //when --producer or --consumer is specified , user should not specify operations as
they are inferred and we also disallow --deny-principals and --deny-hosts.
+      CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt,
denyPrincipalsOpt, denyHostssOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt,
denyPrincipalsOpt, denyHostssOpt))
+
+      if (options.has(producerOpt) && !options.has(topicOpt))
+        CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic")
+
+      if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt)
|| (!options.has(producerOpt) && options.has(clusterOpt))))
+        CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic
and a --consumer-group and no --cluster option should be specified.")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/main/scala/kafka/common/BaseEnum.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/BaseEnum.scala b/core/src/main/scala/kafka/common/BaseEnum.scala
new file mode 100644
index 0000000..9c39466
--- /dev/null
+++ b/core/src/main/scala/kafka/common/BaseEnum.scala
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.common
+
+/*
+ * We inherit from `Product` and `Serializable` because `case` objects and classes inherit
from them and if we don't
+ * do it here, the compiler will infer types that unexpectedly include `Product` and `Serializable`,
see
+ * http://underscore.io/blog/posts/2015/06/04/more-on-sealed.html for more information.
+ */
+trait BaseEnum extends Product with Serializable {
+  def name: String
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/main/scala/kafka/security/auth/Authorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 1569471..8f1a660 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -77,5 +77,10 @@ trait Authorizer extends Configurable {
    * @return empty Map if no acls exist for this principal, otherwise a map of resource ->
acls for the principal.
    */
   def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]
+
+  /**
+   * gets the map of resource to acls for all resources.
+   */
+  def getAcls(): Map[Resource, Set[Acl]]
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/main/scala/kafka/security/auth/Operation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala
index c172e1e..5d31c62 100644
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ b/core/src/main/scala/kafka/security/auth/Operation.scala
@@ -16,13 +16,13 @@
  */
 package kafka.security.auth
 
-import kafka.common.KafkaException
+import kafka.common.{BaseEnum, KafkaException}
 
 /**
  * Different operations a client may perform on kafka resources.
  */
 
-sealed trait Operation { def name: String}
+sealed trait Operation extends BaseEnum
 case object Read extends Operation { val name = "Read" }
 case object Write extends Operation { val name = "Write" }
 case object Create extends Operation { val name = "Create" }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/main/scala/kafka/security/auth/PermissionType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala
index e668d83..fd2a0fe 100644
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala
@@ -16,16 +16,14 @@
  */
 package kafka.security.auth
 
-import kafka.common.KafkaException
+import kafka.common.{BaseEnum, KafkaException}
 
 /**
  * PermissionType.
  */
 
 
-sealed trait PermissionType {
-  def name: String
-}
+sealed trait PermissionType extends BaseEnum
 
 case object Allow extends PermissionType {
   val name = "Allow"

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/main/scala/kafka/security/auth/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index d4c46a9..797c77b 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -20,6 +20,7 @@ object Resource {
   val Separator = ":"
   val ClusterResourceName = "kafka-cluster"
   val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName)
+  val WildCardResource = "*"
 
   def fromString(str: String): Resource = {
     str.split(Separator, 2) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index a0696e1..3b8312d 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -16,16 +16,14 @@
  */
 package kafka.security.auth
 
-import kafka.common.KafkaException
+import kafka.common.{BaseEnum, KafkaException}
 
 /**
  * ResourceTypes.
  */
 
 
-sealed trait ResourceType {
-  def name: String
-}
+sealed trait ResourceType extends BaseEnum
 
 case object Cluster extends ResourceType {
   val name = "Cluster"
@@ -48,4 +46,4 @@ object ResourceType {
   }
 
   def values: Seq[ResourceType] = List(Cluster, Topic, ConsumerGroup)
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 8457cb8..6576264 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -107,7 +107,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean
= {
     val principal: KafkaPrincipal = session.principal
     val host = session.host
-    val acls = getAcls(resource)
+    val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
 
     //check if there is any Deny acl match that would disallow this operation.
     val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)
@@ -221,6 +221,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     }.toMap
   }
 
+  override def getAcls(): Map[Resource, Set[Acl]] = {
+    aclCache.toMap
+  }
+
   private def loadCache()  {
     var acls = Set.empty[Acl]
     val resourceTypes = ZkUtils.getChildren(zkClient, SimpleAclAuthorizer.AclZkPath)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/main/scala/kafka/utils/CommandLineUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 086a624..c51735d 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -50,10 +50,10 @@ object CommandLineUtils extends Logging {
   /**
    * Print usage and exit
    */
-  def printUsageAndDie(parser: OptionParser, message: String) {
+  def printUsageAndDie(parser: OptionParser, message: String): Nothing = {
     System.err.println(message)
     parser.printHelpOn(System.err)
-    System.exit(1)
+    sys.exit(1)
   }
 
   /**
@@ -73,4 +73,4 @@ object CommandLineUtils extends Logging {
     }
     props
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
new file mode 100644
index 0000000..f6d3667
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.admin
+
+import java.io.StringReader
+import java.util.Properties
+
+import kafka.admin.AclCommand
+import kafka.security.auth._
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.junit.{Assert, Test}
+
+class AclCommandTest extends ZooKeeperTestHarness with Logging {
+
+  private val Users = Set(KafkaPrincipal.fromString("User:test1"), KafkaPrincipal.fromString("User:test2"))
+  private val UsersString = Users.mkString(AclCommand.Delimiter.toString)
+  private val Hosts = Set("host1", "host2")
+  private val HostsString = Hosts.mkString(AclCommand.Delimiter.toString)
+
+  private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
+  private val ConsumerGroupResources = Set(new Resource(ConsumerGroup, "testGroup-1"), new
Resource(ConsumerGroup, "testGroup-2"))
+
+  private val ResourceToCommand = Map[Set[Resource], Array[String]](
+    TopicResources -> Array("--topic", "test-1,test-2"),
+    Set(Resource.ClusterResource) -> Array("--cluster"),
+    ConsumerGroupResources -> Array("--consumer-group", "testGroup-1,testGroup-2")
+  )
+
+  private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])](
+    TopicResources -> (Set(Read, Write, Describe), Array("--operations", "Read,Write,Describe")),
+    Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operations",
"Create,ClusterAction")),
+    ConsumerGroupResources -> (Set(Read).toSet[Operation], Array("--operations", "Read"))
+  )
+
+  private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]](
+    TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts),
+    Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, Set(Create), Hosts)
+  )
+
+  private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]](
+    TopicResources -> AclCommand.getAcls(Users, Allow, Set(Read, Describe), Hosts),
+    ConsumerGroupResources -> AclCommand.getAcls(Users, Allow, Set(Read), Hosts)
+  )
+
+  private val CmdToResourcesToAcl = Map[Array[String], Map[Set[Resource], Set[Acl]]](
+    Array[String]("--producer") -> ProducerResourceToAcls,
+    Array[String]("--consumer") -> ConsumerResourceToAcls,
+    Array[String]("--producer", "--consumer") -> ConsumerResourceToAcls.map { case (k,
v) => k -> (v ++
+      ProducerResourceToAcls.getOrElse(k, Set.empty[Acl])) }
+  )
+
+  @Test
+  def testAclCli() {
+    val brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
+    brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
+    val args = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
+
+    for ((resources, resourceCmd) <- ResourceToCommand) {
+      for (permissionType <- PermissionType.values) {
+        val operationToCmd = ResourceToOperations(resources)
+        val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
+          AclCommand.main(args ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
+          for (resource <- resources) {
+            Assert.assertEquals(acls, getAuthorizer(brokerProps).getAcls(resource))
+          }
+
+          testRemove(resources, resourceCmd, args, brokerProps)
+      }
+    }
+  }
+
+  @Test
+  def testProducerConsumerCli() {
+    val brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
+    brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
+    val args = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
+
+    for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
+      val resourceCommand: Array[String] = resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_
++ _)
+      AclCommand.main(args ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
+      for ((resources, acls) <- resourcesToAcls) {
+        for (resource <- resources) {
+          Assert.assertEquals(acls, getAuthorizer(brokerProps).getAcls(resource))
+        }
+      }
+      testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand, args, brokerProps)
+    }
+  }
+
+  private def testRemove(resources: Set[Resource], resourceCmd: Array[String], args: Array[String],
brokerProps: Properties) {
+    for (resource <- resources) {
+      Console.withIn(new StringReader(s"y${AclCommand.Newline}" * resources.size)) {
+        AclCommand.main(args ++ resourceCmd :+ "--remove")
+        Assert.assertEquals(Set.empty[Acl], getAuthorizer(brokerProps).getAcls(resource))
+      }
+    }
+  }
+
+  private def getAclToCommand(permissionType: PermissionType, operations: Set[Operation]):
(Set[Acl], Array[String]) = {
+    (AclCommand.getAcls(Users, permissionType, operations, Hosts), getCmd(permissionType))
+  }
+
+  private def getCmd(permissionType: PermissionType): Array[String] = {
+    if (permissionType == Allow)
+      Array("--allow-principals", UsersString, "--allow-hosts", HostsString)
+    else
+      Array("--deny-principals", UsersString, "--deny-hosts", HostsString)
+  }
+
+  def getAuthorizer(props: Properties): Authorizer = {
+    val kafkaConfig = KafkaConfig.fromProps(props)
+    val authZ = new SimpleAclAuthorizer
+    authZ.configure(kafkaConfig.originals)
+
+    authZ
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5764e54d/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index bd37ce2..3276a79 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -141,6 +141,31 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testWildCardAcls(): Unit = {
+    assertFalse("when acls = [],  authorizer should fail close.", simpleAclAuthorizer.authorize(session,
Read, resource))
+
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val host1 = "host1"
+    val readAcl = new Acl(user1, Allow, host1, Read)
+    val wildCardResource = new Resource(resource.resourceType, Resource.WildCardResource)
+
+    val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource)
+
+    val host1Session = new Session(user1, host1)
+    assertTrue("User1 should have Read access from host1", simpleAclAuthorizer.authorize(host1Session,
Read, resource))
+
+    //allow Write to specific topic.
+    val writeAcl = new Acl(user1, Allow, host1, Write)
+    changeAclAndVerify(Set.empty[Acl], Set[Acl](writeAcl), Set.empty[Acl])
+
+    //deny Write to wild card topic.
+    val denyWriteOnWildCardResourceAcl = new Acl(user1, Deny, host1, Write)
+    changeAclAndVerify(acls, Set[Acl](denyWriteOnWildCardResourceAcl), Set.empty[Acl], wildCardResource)
+
+    assertFalse("User1 should not have Write access from host1", simpleAclAuthorizer.authorize(host1Session,
Write, resource))
+  }
+
+  @Test
   def testNoAclFound() {
     assertFalse("when acls = [],  authorizer should fail close.", simpleAclAuthorizer.authorize(session,
Read, resource))
   }
@@ -175,11 +200,21 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     acls = changeAclAndVerify(acls, Set[Acl](acl5), Set.empty[Acl])
 
     //test get by principal name.
-    TestUtils.waitUntilTrue(() => Map(resource -> Set(acl1, acl2)) == simpleAclAuthorizer.getAcls(user1),
"changes not propogated in timeout period")
-    TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2),
"changes not propogated in timeout period")
+    TestUtils.waitUntilTrue(() => Map(resource -> Set(acl1, acl2)) == simpleAclAuthorizer.getAcls(user1),
"changes not propagated in timeout period")
+    TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2),
"changes not propagated in timeout period")
+
+    val resourceToAcls = Map[Resource, Set[Acl]](
+      new Resource(Topic, Resource.WildCardResource) -> Set[Acl](new Acl(user2, Allow,
WildCardHost, Read)),
+      new Resource(Cluster, Resource.WildCardResource) -> Set[Acl](new Acl(user2, Allow,
host1, Read)),
+      new Resource(ConsumerGroup, Resource.WildCardResource) -> acls,
+      new Resource(ConsumerGroup, "test-ConsumerGroup") -> acls
+    )
+
+    resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value,
Set.empty[Acl], key) }
+    assertEquals(resourceToAcls + (resource -> acls), simpleAclAuthorizer.getAcls())
 
     //test remove acl from existing acls.
-    changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
+    acls = changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
 
     //test remove all acls for resource
     simpleAclAuthorizer.removeAcls(resource)
@@ -213,7 +248,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     assertEquals(acls1, authorizer.getAcls(resource1))
   }
 
-  private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls:
Set[Acl]): Set[Acl] = {
+  private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls:
Set[Acl], resource: Resource = resource): Set[Acl] = {
     var acls = originalAcls
 
     if(addedAcls.nonEmpty) {


Mime
View raw message