kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-2210; KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation; patched by Parth Brahmbhatt; reviewed Ismael Juma and Jun Rao
Date Thu, 03 Sep 2015 01:09:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d02ca36ca -> 689d170ac


kafka-2210; KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation; patched by Parth Brahmbhatt; reviewed Ismael Juma and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/689d170a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/689d170a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/689d170a

Branch: refs/heads/trunk
Commit: 689d170ac32ed15af9098faa03a97609af14bf05
Parents: d02ca36
Author: Parth Brahmbhatt <pbrahmbhatt@hortonworks.com>
Authored: Wed Sep 2 18:09:13 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Sep 2 18:09:13 2015 -0700

----------------------------------------------------------------------
 .../common/network/PlaintextTransportLayer.java |   1 -
 .../apache/kafka/common/protocol/Errors.java    |   3 +-
 .../common/security/auth/KafkaPrincipal.java    |  69 ++++---
 .../security/auth/KafkaPrincipalTest.java       |  42 +++++
 .../main/scala/kafka/api/OffsetRequest.scala    |   2 +-
 .../kafka/common/AuthorizationException.scala   |  24 +++
 .../main/scala/kafka/common/ErrorMapping.scala  |  14 +-
 .../main/scala/kafka/security/auth/Acl.scala    | 111 +++++++++++
 .../scala/kafka/security/auth/Authorizer.scala  |  81 ++++++++
 .../scala/kafka/security/auth/Operation.scala   |  42 +++++
 .../kafka/security/auth/PermissionType.scala    |  46 +++++
 .../scala/kafka/security/auth/Resource.scala    |  44 +++++
 .../kafka/security/auth/ResourceType.scala      |  51 +++++
 .../src/main/scala/kafka/server/KafkaApis.scala | 188 +++++++++++++++----
 .../main/scala/kafka/server/KafkaConfig.scala   |  13 ++
 .../main/scala/kafka/server/KafkaServer.scala   |  12 +-
 .../unit/kafka/security/auth/AclTest.scala      |  44 +++++
 .../kafka/security/auth/OperationTest.scala     |  39 ++++
 .../security/auth/PermissionTypeTest.scala      |  39 ++++
 .../kafka/security/auth/ResourceTypeTest.scala  |  39 ++++
 .../unit/kafka/server/KafkaConfigTest.scala     |   2 +
 21 files changed, 842 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index 35d4168..9bfa3a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -29,7 +29,6 @@ import java.nio.channels.SelectionKey;
 import java.security.Principal;
 
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 641afa1..b3415c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -87,7 +87,8 @@ public enum Errors {
     COMMITTING_PARTITIONS_NOT_ASSIGNED(27,
             new ApiException("Some of the committing partitions are not assigned the committer")),
     INVALID_COMMIT_OFFSET_SIZE(28,
-            new ApiException("The committing offset data size is not valid"));
+            new ApiException("The committing offset data size is not valid")),
+    AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
index b640ea0..06c59d1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -5,55 +5,82 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.common.security.auth;
 
 import java.security.Principal;
 
 public class KafkaPrincipal implements Principal {
-    public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal("ANONYMOUS");
-    private final String name;
+    public static final String SEPARATOR = ":";
+    public static final String USER_TYPE = "User";
+    public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS");
+
+    private String principalType;
+    private String name;
 
-    public KafkaPrincipal(String name) {
-        if (name == null)
-            throw new IllegalArgumentException("name is null");
+    public KafkaPrincipal(String principalType, String name) {
+        if (principalType == null || name == null) {
+            throw new IllegalArgumentException("principalType and name can not be null");
+        }
+        this.principalType = principalType;
         this.name = name;
     }
 
-    @Override
-    public boolean equals(Object object) {
-        if (this == object)
-            return true;
+    public static KafkaPrincipal fromString(String str) {
+        if (str == null || str.isEmpty()) {
+            throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
+        }
+
+        String[] split = str.split(SEPARATOR, 2);
 
-        if (object instanceof KafkaPrincipal) {
-            return name.equals(((KafkaPrincipal) object).getName());
+        if (split == null || split.length != 2) {
+            throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
         }
 
-        return false;
+        return new KafkaPrincipal(split[0], split[1]);
     }
 
     @Override
-    public int hashCode() {
-        return name.hashCode();
+    public String toString() {
+        return principalType + SEPARATOR + name;
     }
 
     @Override
-    public String getName() {
-        return name;
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof KafkaPrincipal)) return false;
+
+        KafkaPrincipal that = (KafkaPrincipal) o;
+
+        if (!principalType.equals(that.principalType)) return false;
+        return name.equals(that.name);
+
     }
 
     @Override
-    public String toString() {
+    public int hashCode() {
+        int result = principalType.hashCode();
+        result = 31 * result + name.hashCode();
+        return result;
+    }
+
+    @Override
+    public String getName() {
         return name;
     }
 
+    public String getPrincipalType() {
+        return principalType;
+    }
 }
+
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java
new file mode 100644
index 0000000..051ad04
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KafkaPrincipalTest {
+
+    @Test
+    public void testPrincipalNameCanContainSeparator() {
+        String name = "name" + KafkaPrincipal.SEPARATOR + "with" + KafkaPrincipal.SEPARATOR + "in" + KafkaPrincipal.SEPARATOR + "it";
+
+        KafkaPrincipal principal = KafkaPrincipal.fromString(KafkaPrincipal.USER_TYPE + KafkaPrincipal.SEPARATOR + name);
+        Assert.assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+        Assert.assertEquals(name, principal.getName());
+    }
+
+    @Test
+    public void testEqualsAndHashCode() {
+        String name = "KafkaUser";
+        KafkaPrincipal principal1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, name);
+        KafkaPrincipal principal2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, name);
+
+        Assert.assertEquals(principal1.hashCode(), principal2.hashCode());
+        Assert.assertEquals(principal1, principal2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index f418868..d2c1c95 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -115,7 +115,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val partitionOffsetResponseMap = requestInfo.map {
       case (topicAndPartition, partitionOffsetRequest) =>
-        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null))
+        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil))
     }
     val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/common/AuthorizationException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/AuthorizationException.scala b/core/src/main/scala/kafka/common/AuthorizationException.scala
new file mode 100644
index 0000000..009cf1a
--- /dev/null
+++ b/core/src/main/scala/kafka/common/AuthorizationException.scala
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.common
+
+/**
+ * Exception thrown when a principal is not authorized to perform an operation.
+ * @param message
+ */
+class AuthorizationException(message: String) extends RuntimeException(message) {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index c75c685..23224ec 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -17,8 +17,10 @@
 
 package kafka.common
 
-import kafka.message.InvalidMessageException
 import java.nio.ByteBuffer
+
+import kafka.message.InvalidMessageException
+
 import scala.Predef._
 
 /**
@@ -51,6 +53,13 @@ object ErrorMapping {
   val NotEnoughReplicasAfterAppendCode: Short = 20
   // 21: InvalidRequiredAcks
   // 22: IllegalConsumerGeneration
+  // 23: INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY
+  // 24: UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY
+  // 25: UNKNOWN_CONSUMER_ID
+  // 26: INVALID_SESSION_TIMEOUT
+  // 27: COMMITTING_PARTITIONS_NOT_ASSIGNED
+  // 28: INVALID_COMMIT_OFFSET_SIZE
+  val AuthorizationCode: Short = 29;
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](
@@ -72,7 +81,8 @@ object ErrorMapping {
       classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
       classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
       classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
-      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode
+      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode,
+      classOf[AuthorizationException].asInstanceOf[Class[Throwable]] -> AuthorizationCode
     ).withDefaultValue(UnknownCode)
 
   /* invert the mapping */

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/security/auth/Acl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
new file mode 100644
index 0000000..c23dd2d
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.auth
+
+import kafka.utils.Json
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+
+object Acl {
+  val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
+  val WildCardHost: String = "*"
+  val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All)
+  val PrincipalKey = "principal"
+  val PermissionTypeKey = "permissionType"
+  val OperationKey = "operation"
+  val HostsKey = "host"
+  val VersionKey = "version"
+  val CurrentVersion = 1
+  val AclsKey = "acls"
+
+  /**
+   *
+   * @param aclJson
+   *
+   * <p>
+      {
+        "version": 1,
+        "acls": [
+          {
+            "host":"host1",
+            "permissionType": "Deny",
+            "operation": "Read",
+            "principal": "User:alice"
+          }
+        ]
+      }
+   * </p>
+   *
+   * @return
+   */
+  def fromJson(aclJson: String): Set[Acl] = {
+    if (aclJson == null || aclJson.isEmpty)
+      return collection.immutable.Set.empty[Acl]
+
+    var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]()
+    Json.parseFull(aclJson) match {
+      case Some(m) =>
+        val aclMap = m.asInstanceOf[Map[String, Any]]
+        //the acl json version.
+        require(aclMap(VersionKey) == CurrentVersion)
+        val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
+        aclSet.foreach(item => {
+          val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
+          val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
+          val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
+          val host: String = item(HostsKey).asInstanceOf[String]
+          acls += new Acl(principal, permissionType, host, operation)
+        })
+      case None =>
+    }
+    acls.toSet
+  }
+
+  def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {
+    Map(Acl.VersionKey -> Acl.CurrentVersion, Acl.AclsKey -> acls.map(acl => acl.toMap).toList)
+  }
+}
+
+/**
+ * An instance of this class will represent an acl that can express following statement.
+ * <pre>
+ * Principal P has permissionType PT on Operation O1 from hosts H1.
+ * </pre>
+ * @param principal A value of *:* indicates all users.
+ * @param permissionType
+ * @param host A value of * indicates all hosts.
+ * @param operation A value of ALL indicates all operations.
+ */
+case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host: String, operation: Operation) {
+
+  /**
+   * TODO: Ideally we would have a symmetric toJson method but our current json library can not jsonify/dejsonify complex objects.
+   * @return Map representation of the Acl.
+   */
+  def toMap(): Map[String, Any] = {
+    Map(Acl.PrincipalKey -> principal.toString,
+      Acl.PermissionTypeKey -> permissionType.name,
+      Acl.OperationKey -> operation.name,
+      Acl.HostsKey -> host)
+  }
+
+  override def toString: String = {
+    "%s has %s permission for operations: %s from hosts: %s".format(principal, permissionType.name, operation, host)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/security/auth/Authorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
new file mode 100644
index 0000000..1f0547d
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.auth
+
+import kafka.network.RequestChannel.Session
+import kafka.server.KafkaConfig
+import org.apache.kafka.common.Configurable
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+
+/**
+ * Top level interface that all plugable authorizer must implement. Kafka server will read "authorizer.class" config
+ * value at startup time, create an instance of the specified class and call initialize method.
+ * authorizer.class must be a class that implements this interface.
+ * If authorizer.class has no value specified no authorization will be performed.
+ *
+ * From that point onwards, every client request will first be routed to authorize method and the request will only be
+ * authorized if the method returns true.
+ */
+trait Authorizer extends Configurable {
+
+  /**
+   * @param session The session being authenticated.
+   * @param operation Type of operation client is trying to perform on resource.
+   * @param resource Resource the client is trying to access.
+   * @return
+   */
+  def authorize(session: Session, operation: Operation, resource: Resource): Boolean
+
+  /**
+   * add the acls to resource, this is an additive operation so existing acls will not be overwritten, instead these new
+   * acls will be added to existing acls.
+   * @param acls set of acls to add to existing acls
+   * @param resource the resource to which these acls should be attached.
+   */
+  def addAcls(acls: Set[Acl], resource: Resource): Unit
+
+  /**
+   * remove these acls from the resource.
+   * @param acls set of acls to be removed.
+   * @param resource resource from which the acls should be removed.
+   * @return true if some acl got removed, false if no acl was removed.
+   */
+  def removeAcls(acls: Set[Acl], resource: Resource): Boolean
+
+  /**
+   * remove a resource along with all of its acls from acl store.
+   * @param resource
+   * @return
+   */
+  def removeAcls(resource: Resource): Boolean
+
+  /**
+   * get set of acls for this resource
+   * @param resource
+   * @return empty set if no acls are found, otherwise the acls for the resource.
+   */
+  def getAcls(resource: Resource): Set[Acl]
+
+  /**
+   * get the acls for this principal.
+   * @param principal
+   * @return empty set if no acls exist for this principal, otherwise the acls for the principal.
+   */
+  def getAcls(principal: KafkaPrincipal): Set[Acl]
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/security/auth/Operation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala
new file mode 100644
index 0000000..c172e1e
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/Operation.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import kafka.common.KafkaException
+
+/**
+ * Different operations a client may perform on kafka resources.
+ */
+
+sealed trait Operation { def name: String}
+case object Read extends Operation { val name = "Read" }
+case object Write extends Operation { val name = "Write" }
+case object Create extends Operation { val name = "Create" }
+case object Delete extends Operation { val name = "Delete" }
+case object Alter extends Operation { val name = "Alter" }
+case object Describe extends Operation { val name = "Describe" }
+case object ClusterAction extends Operation { val name = "ClusterAction" }
+case object All extends Operation { val name = "All" }
+
+object Operation {
+   def fromString(operation: String): Operation = {
+      val op = values.find(op => op.name.equalsIgnoreCase(operation))
+      op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(",")))
+   }
+
+   def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, All)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/security/auth/PermissionType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala
new file mode 100644
index 0000000..e668d83
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import kafka.common.KafkaException
+
+/**
+ * PermissionType.
+ */
+
+
+sealed trait PermissionType {
+  def name: String
+}
+
+case object Allow extends PermissionType {
+  val name = "Allow"
+}
+
+case object Deny extends PermissionType {
+  val name = "Deny"
+}
+
+object PermissionType {
+  def fromString(permissionType: String): PermissionType = {
+    val pType = values.find(pType => pType.name.equalsIgnoreCase(permissionType))
+    pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(",")))
+  }
+
+  def values: Seq[PermissionType] = List(Allow, Deny)
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/security/auth/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
new file mode 100644
index 0000000..d4c46a9
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+object Resource {
+  val Separator = ":"
+  val ClusterResourceName = "kafka-cluster"
+  val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName)
+
+  def fromString(str: String): Resource = {
+    str.split(Separator, 2) match {
+      case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name)
+      case s => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
+    }
+  }
+}
+
+/**
+ *
+ * @param resourceType type of resource.
+ * @param name name of the resource, for topic this will be topic name , for group it will be group name. For cluster type
+ *             it will be a constant string kafka-cluster.
+ */
+case class Resource(val resourceType: ResourceType, val name: String) {
+
+  override def toString: String = {
+    resourceType.name + Resource.Separator + name
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
new file mode 100644
index 0000000..a0696e1
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import kafka.common.KafkaException
+
+/**
+ * ResourceTypes.
+ */
+
+
+sealed trait ResourceType {
+  def name: String
+}
+
+case object Cluster extends ResourceType {
+  val name = "Cluster"
+}
+
+case object Topic extends ResourceType {
+  val name = "Topic"
+}
+
+case object ConsumerGroup extends ResourceType {
+  val name = "ConsumerGroup"
+}
+
+
+object ResourceType {
+
+  def fromString(resourceType: String): ResourceType = {
+    val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceType))
+    rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
+  }
+
+  def values: Seq[ResourceType] = List(Cluster, Topic, ConsumerGroup)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a3a8df0..2b9531b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import kafka.message.MessageSet
+import kafka.security.auth.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.TopicPartition
@@ -27,11 +29,13 @@ import kafka.controller.KafkaController
 import kafka.coordinator.ConsumerCoordinator
 import kafka.log._
 import kafka.network._
-import kafka.network.RequestChannel.Response
+import kafka.network.RequestChannel.{Session, Response}
 import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, ResponseHeader, ResponseSend}
 import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging}
 import scala.collection._
 import org.I0Itec.zkclient.ZkClient
+import kafka.security.auth.{Authorizer, Read, Write, Create, ClusterAction, Describe, Resource, Topic, Operation, ConsumerGroup}
+
 
 /**
  * Logic to handle the various Kafka requests
@@ -44,7 +48,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val brokerId: Int,
                 val config: KafkaConfig,
                 val metadataCache: MetadataCache,
-                val metrics: Metrics) extends Logging {
+                val metrics: Metrics,
+                val authorizer: Option[Authorizer]) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
   // Store all the quota managers for each type of request
@@ -98,6 +103,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
     // stop serving data to clients for the topic being deleted
     val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
+
+    authorizeClusterAction(request)
+
     try {
       // call replica manager to handle updating partitions to become leader or follower
       val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
@@ -129,6 +137,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
     // stop serving data to clients for the topic being deleted
     val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
+
+    authorizeClusterAction(request)
+
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, stopReplicaResponse)))
@@ -137,6 +148,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleUpdateMetadataRequest(request: RequestChannel.Request) {
     val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
+
+    authorizeClusterAction(request)
+
     replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache)
 
     val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
@@ -148,6 +162,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
     // stop serving data to clients for the topic being deleted
     val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
+
+    authorizeClusterAction(request)
+
     val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
     val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
       ErrorMapping.NoError, partitionsRemaining)
@@ -167,26 +184,34 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
     val filteredRequestInfo = (offsetCommitRequest.requestInfo -- invalidRequestsInfo.keys)
 
+    val (authorizedRequestInfo, unauthorizedRequestInfo) =  filteredRequestInfo.partition {
+      case (topicAndPartition, offsetMetadata) =>
+        authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) &&
+          authorize(request.session, Read, new Resource(ConsumerGroup, offsetCommitRequest.groupId))
+    }
+
     // the callback for sending an offset commit response
     def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
-      commitStatus.foreach { case (topicAndPartition, errorCode) =>
+      val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.AuthorizationCode)
+
+      mergedCommitStatus.foreach { case (topicAndPartition, errorCode) =>
         // we only print warnings for known errors here; only replica manager could see an unknown
         // exception while trying to write the offset message to the local log, and it will log
         // an error message and write the error code in this case; hence it can be ignored here
         if (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.UnknownCode) {
           debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s"
             .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId,
-            topicAndPartition, ErrorMapping.exceptionNameFor(errorCode)))
+              topicAndPartition, ErrorMapping.exceptionNameFor(errorCode)))
         }
       }
-      val combinedCommitStatus = commitStatus ++ invalidRequestsInfo.map(_._1 -> ErrorMapping.UnknownTopicOrPartitionCode)
+      val combinedCommitStatus = mergedCommitStatus ++ invalidRequestsInfo.map(_._1 -> ErrorMapping.UnknownTopicOrPartitionCode)
       val response = OffsetCommitResponse(combinedCommitStatus, offsetCommitRequest.correlationId)
       requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
     }
 
     if (offsetCommitRequest.versionId == 0) {
       // for version 0 always store offsets to ZK
-      val responseInfo = filteredRequestInfo.map {
+      val responseInfo = authorizedRequestInfo.map {
         case (topicAndPartition, metaAndError) => {
           val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
           try {
@@ -227,8 +252,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       //   - If v2 we use the default expiration timestamp
       val currentTimestamp = SystemTime.milliseconds
       val defaultExpireTimestamp = offsetRetention + currentTimestamp
-
-      val offsetData = filteredRequestInfo.mapValues(offsetAndMetadata =>
+      val offsetData = authorizedRequestInfo.mapValues(offsetAndMetadata =>
         offsetAndMetadata.copy(
           commitTimestamp = currentTimestamp,
           expireTimestamp = {
@@ -250,6 +274,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def authorize(session: Session, operation: Operation, resource: Resource): Boolean =
+    authorizer.map(_.authorize(session, operation, resource)).getOrElse(true)
+
   /**
    * Handle a produce request
    */
@@ -257,11 +284,16 @@ class KafkaApis(val requestChannel: RequestChannel,
     val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
     val numBytesAppended = produceRequest.sizeInBytes
 
+    val (authorizedRequestInfo, unauthorizedRequestInfo) =  produceRequest.data.partition  {
+      case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic))
+    }
+
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
       var errorInResponse = false
-      responseStatus.foreach
-      { case (topicAndPartition, status) =>
+      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.AuthorizationCode, -1))
+
+      mergedResponseStatus.foreach { case (topicAndPartition, status) =>
         // we only print warnings for known errors here; if it is unknown, it will cause
         // an error message in the replica manager
         if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) {
@@ -290,7 +322,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           }
         } else {
           val response = ProducerResponse(produceRequest.correlationId,
-                                          responseStatus,
+                                          mergedResponseStatus,
                                           produceRequest.versionId,
                                           delayTimeMs)
           requestChannel.sendResponse(new RequestChannel.Response(request,
@@ -313,7 +345,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       produceRequest.ackTimeoutMs.toLong,
       produceRequest.requiredAcks,
       internalTopicsAllowed,
-      produceRequest.data,
+      authorizedRequestInfo,
       sendResponseCallback)
 
     // if the request is put into the purgatory, it will have a held reference
@@ -328,9 +360,17 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
 
+    val (authorizedRequestInfo, unauthorizedRequestInfo) =  fetchRequest.requestInfo.partition {
+      case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
+    }
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.AuthorizationCode, -1, MessageSet.Empty))
+
     // the callback for sending a fetch response
     def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
-      responsePartitionData.foreach { case (topicAndPartition, data) =>
+      val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus
+
+      mergedResponseStatus.foreach { case (topicAndPartition, data) =>
         // we only print warnings for known errors here; if it is unknown, it will cause
         // an error message in the replica manager already and hence can be ignored here
         if (data.error != ErrorMapping.NoError && data.error != ErrorMapping.UnknownCode) {
@@ -365,7 +405,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       fetchRequest.maxWait.toLong,
       fetchRequest.replicaId,
       fetchRequest.minBytes,
-      fetchRequest.requestInfo,
+      authorizedRequestInfo,
       sendResponseCallback)
   }
 
@@ -374,11 +414,18 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleOffsetRequest(request: RequestChannel.Request) {
     val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
-    val responseMap = offsetRequest.requestInfo.map(elem => {
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.requestInfo.partition  {
+      case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
+    }
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.AuthorizationCode, Nil))
+
+    val responseMap = authorizedRequestInfo.map(elem => {
       val (topicAndPartition, partitionOffsetRequestInfo) = elem
       try {
         // ensure leader exists
-        val localReplica = if(!offsetRequest.isFromDebuggingClient)
+        val localReplica = if (!offsetRequest.isFromDebuggingClient)
           replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
         else
           replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition)
@@ -414,7 +461,9 @@ class KafkaApis(val requestChannel: RequestChannel,
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
       }
     })
-    val response = OffsetResponse(offsetRequest.correlationId, responseMap)
+
+    val mergedResponseMap = responseMap ++ unauthorizedResponseStatus
+    val response = OffsetResponse(offsetRequest.correlationId, mergedResponseMap)
     requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
   }
 
@@ -433,14 +482,14 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
     val segsArray = log.logSegments.toArray
     var offsetTimeArray: Array[(Long, Long)] = null
-    if(segsArray.last.size > 0)
+    if (segsArray.last.size > 0)
       offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
     else
       offsetTimeArray = new Array[(Long, Long)](segsArray.length)
 
     for(i <- 0 until segsArray.length)
       offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
-    if(segsArray.last.size > 0)
+    if (segsArray.last.size > 0)
       offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
 
     var startIndex = -1
@@ -517,22 +566,58 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
-    val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol)
+
+    //if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized
+    val topics = if (metadataRequest.topics.isEmpty) {
+      val topicResponses = metadataCache.getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol)
+      topicResponses.map(_.topic).filter(topic => authorize(request.session, Describe, new Resource(Topic, topic))).toSet
+    } else {
+      metadataRequest.topics.toSet
+    }
+
+    //when topics is empty this will be a duplicate authorization check but given this should just be a cache lookup, it should not matter.
+    var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
+
+    if (!authorizedTopics.isEmpty) {
+      val topicResponses = metadataCache.getTopicMetadata(authorizedTopics, request.securityProtocol)
+      if (config.autoCreateTopicsEnable && topicResponses.size != authorizedTopics.size) {
+        val nonExistentTopics: Set[String] = topics -- topicResponses.map(_.topic).toSet
+        authorizer.foreach {
+          az => if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
+            authorizedTopics --= nonExistentTopics
+            unauthorizedTopics ++= nonExistentTopics
+          }
+        }
+      }
+    }
+
+    val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.AuthorizationCode))
+
+    val topicMetadata = getTopicMetadata(authorizedTopics, request.securityProtocol)
     val brokers = metadataCache.getAliveBrokers
     trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
-    val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata, metadataRequest.correlationId)
+    val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata  ++ unauthorizedTopicMetaData, metadataRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
   }
 
   /*
    * Handle an offset fetch request
    */
+
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
     val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
 
+    val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
+      authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) &&
+        authorize(request.session, Read, new Resource(ConsumerGroup, offsetFetchRequest.groupId))
+    }
+
+    val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.AuthorizationCode)
+    val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap
+
     val response = if (offsetFetchRequest.versionId == 0) {
       // version 0 reads offsets from ZK
-      val responseInfo = offsetFetchRequest.requestInfo.map( topicAndPartition => {
+      val responseInfo = authorizedTopicPartitions.map( topicAndPartition => {
         val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicAndPartition.topic)
         try {
           if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) {
@@ -551,15 +636,17 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       })
 
-      OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId)
+      val unauthorizedTopics = unauthorizedTopicPartitions.map( topicAndPartition =>
+        (topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,ErrorMapping.AuthorizationCode)))
+      OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedTopics, offsetFetchRequest.correlationId)
     } else {
       // version 1 reads offsets from Kafka;
-      val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, offsetFetchRequest.requestInfo).toMap
+      val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
 
       // Note that we do not need to filter the partitions in the
       // metadata cache as the topic partitions will be filtered
       // in coordinator's offset manager through the offset cache
-      OffsetFetchResponse(offsets, offsetFetchRequest.correlationId)
+      OffsetFetchResponse(offsets ++ unauthorizedStatus, offsetFetchRequest.correlationId)
     }
 
     trace("Sending offset fetch response %s for correlation id %d to client %s."
@@ -576,7 +663,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val partition = coordinator.partitionFor(consumerMetadataRequest.group)
 
-    // get metadata (and create the topic if necessary)
+    if (!authorize(request.session, Read, new Resource(ConsumerGroup, consumerMetadataRequest.group)))
+      throw new AuthorizationException("Request " + consumerMetadataRequest + " is not authorized to read from consumer group " + consumerMetadataRequest.group)
+
+    //get metadata (and create the topic if necessary)
     val offsetsTopicMetadata = getTopicMetadata(Set(ConsumerCoordinator.OffsetsTopicName), request.securityProtocol).head
 
     val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId)
@@ -599,10 +689,22 @@ class KafkaApis(val requestChannel: RequestChannel,
     val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
     val respHeader = new ResponseHeader(request.header.correlationId)
 
+    val (authorizedTopics, unauthorizedTopics) = joinGroupRequest.topics().partition { topic =>
+      authorize(request.session, Read, new Resource(Topic, topic)) &&
+        authorize(request.session, Read, new Resource(ConsumerGroup, joinGroupRequest.groupId()))
+    }
+
     // the callback for sending a join-group response
     def sendResponseCallback(partitions: Set[TopicAndPartition], consumerId: String, generationId: Int, errorCode: Short) {
-      val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer
-      val responseBody = new JoinGroupResponse(errorCode, generationId, consumerId, partitionList)
+      val error = if (errorCode == ErrorMapping.NoError && unauthorizedTopics.nonEmpty) ErrorMapping.AuthorizationCode else errorCode
+
+      val partitionList = if (error == ErrorMapping.NoError)
+        partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer
+      else
+        List.empty.toBuffer
+
+      val responseBody = new JoinGroupResponse(error, generationId, consumerId, partitionList)
+
       trace("Sending join group response %s for correlation id %d to client %s."
               .format(responseBody, request.header.correlationId, request.header.clientId))
       requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
@@ -612,7 +714,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     coordinator.handleJoinGroup(
       joinGroupRequest.groupId(),
       joinGroupRequest.consumerId(),
-      joinGroupRequest.topics().toSet,
+      authorizedTopics.toSet,
       joinGroupRequest.sessionTimeout(),
       joinGroupRequest.strategy(),
       sendResponseCallback)
@@ -626,16 +728,22 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(errorCode: Short) {
       val response = new HeartbeatResponse(errorCode)
       trace("Sending heartbeat response %s for correlation id %d to client %s."
-              .format(response, request.header.correlationId, request.header.clientId))
+        .format(response, request.header.correlationId, request.header.clientId))
       requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
     }
 
-    // let the coordinator to handle heartbeat
-    coordinator.handleHeartbeat(
-      heartbeatRequest.groupId(),
-      heartbeatRequest.consumerId(),
-      heartbeatRequest.groupGenerationId(),
-      sendResponseCallback)
+    if (!authorize(request.session, Read, new Resource(ConsumerGroup, heartbeatRequest.groupId))) {
+      val heartbeatResponse = new HeartbeatResponse(ErrorMapping.AuthorizationCode)
+      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
+    }
+    else {
+      // let the coordinator to handle heartbeat
+      coordinator.handleHeartbeat(
+        heartbeatRequest.groupId(),
+        heartbeatRequest.consumerId(),
+        heartbeatRequest.groupGenerationId(),
+        sendResponseCallback)
+    }
   }
 
   /*
@@ -666,9 +774,15 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def close() {
-    quotaManagers.foreach { case(apiKey, quotaManager) =>
+    quotaManagers.foreach { case (apiKey, quotaManager) =>
       quotaManager.shutdown()
     }
     info("Shutdown complete.")
   }
+
+  def authorizeClusterAction(request: RequestChannel.Request): Unit = {
+    if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
+      throw new AuthorizationException(s"Request $request is not authorized.")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d547a01..1e8b233 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -52,6 +52,9 @@ object Defaults {
   val BackgroundThreads = 10
   val QueuedMaxRequests = 500
 
+  /************* Authorizer Configuration ***********/
+  val AuthorizerClassName = ""
+
   /** ********* Socket Server Configuration ***********/
   val Port = 9092
   val HostName: String = new String("")
@@ -194,6 +197,8 @@ object KafkaConfig {
   val NumIoThreadsProp = "num.io.threads"
   val BackgroundThreadsProp = "background.threads"
   val QueuedMaxRequestsProp = "queued.max.requests"
+  /************* Authorizer Configuration ***********/
+  val AuthorizerClassNameProp = "authorizer.class.name"
   /** ********* Socket Server Configuration ***********/
   val PortProp = "port"
   val HostNameProp = "host.name"
@@ -335,6 +340,8 @@ object KafkaConfig {
   val NumIoThreadsDoc = "The number of io threads that the server uses for carrying out network requests"
   val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"
   val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads"
+  /************* Authorizer Configuration ***********/
+  val AuthorizerClassNameDoc = "The authorizer class that should be used for authorization"
   /** ********* Socket Server Configuration ***********/
   val PortDoc = "the port to listen and accept connections on"
   val HostNameDoc = "hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces"
@@ -509,6 +516,9 @@ object KafkaConfig {
       .define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc)
       .define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc)
 
+      /************* Authorizer Configuration ***********/
+      .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
+
       /** ********* Socket Server Configuration ***********/
       .define(PortProp, INT, Defaults.Port, HIGH, PortDoc)
       .define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc)
@@ -684,6 +694,9 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
   val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
 
+  /************* Authorizer Configuration ***********/
+  val authorizerClassName: String = getString(KafkaConfig.AuthorizerClassNameProp)
+
   /** ********* Socket Server Configuration ***********/
   val hostName = getString(KafkaConfig.HostNameProp)
   val port = getInt(KafkaConfig.PortProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 756cf77..30406ce 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -29,6 +29,7 @@ import java.util.concurrent._
 import atomic.{AtomicInteger, AtomicBoolean}
 import java.io.{IOException, File}
 
+import kafka.security.auth.Authorizer
 import kafka.utils._
 import org.apache.kafka.clients.{ManualMetadataUpdater, ClientRequest, NetworkClient}
 import org.apache.kafka.common.Node
@@ -188,9 +189,18 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler)
         consumerCoordinator.startup()
 
+        /* Get the authorizer and initialize it if one is specified.*/
+        val authorizer: Option[Authorizer] = if (config.authorizerClassName != null && !config.authorizerClassName.isEmpty) {
+          val authZ: Authorizer = CoreUtils.createObject(config.authorizerClassName)
+          authZ.configure(config.originals())
+          Option(authZ)
+        } else {
+          None
+        }
+
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
-          kafkaController, zkClient, config.brokerId, config, metadataCache, metrics)
+          kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer)
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
         brokerState.newState(RunningAsBroker)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
new file mode 100644
index 0000000..d4de179
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.security.auth
+
+import kafka.security.auth._
+import kafka.utils.Json
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.junit.{Test, Assert}
+import org.scalatest.junit.JUnitSuite
+
+class AclTest extends JUnitSuite {
+
+  val AclJson = "{\"version\": 1, \"acls\": [{\"host\": \"host1\",\"permissionType\": \"Deny\",\"operation\": \"READ\", \"principal\": \"User:alice\"  },  " +
+    "{  \"host\":  \"*\" ,  \"permissionType\": \"Allow\",  \"operation\":  \"Read\", \"principal\": \"User:bob\"  },  " +
+    "{  \"host\": \"host1\",  \"permissionType\": \"Deny\",  \"operation\":   \"Read\" ,  \"principal\": \"User:bob\"}  ]}"
+
+  @Test
+  def testAclJsonConversion(): Unit = {
+    val acl1 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"), Deny, "host1" , Read)
+    val acl2 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Allow, "*", Read)
+    val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Deny, "host1", Read)
+
+    val acls = Set[Acl](acl1, acl2, acl3)
+    val jsonAcls = Json.encode(Acl.toJsonCompatibleMap(acls))
+
+    Assert.assertEquals(acls, Acl.fromJson(jsonAcls))
+    Assert.assertEquals(acls, Acl.fromJson(AclJson))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
new file mode 100644
index 0000000..2f15f9f
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.security.auth
+
+import kafka.common.{KafkaException}
+import kafka.security.auth.{Operation, Read}
+import org.junit.{Test, Assert}
+import org.scalatest.junit.JUnitSuite
+
+class OperationTest extends JUnitSuite {
+
+  @Test
+  def testFromString(): Unit = {
+    val op = Operation.fromString("READ")
+    Assert.assertEquals(Read, op)
+
+    try {
+      Operation.fromString("badName")
+      fail("Expected exception on invalid operation name.")
+    } catch {
+      case e: KafkaException => "Expected."
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
new file mode 100644
index 0000000..0518985
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.security.auth
+
+import kafka.common.KafkaException
+import kafka.security.auth.{Allow, PermissionType}
+import org.junit.{Test, Assert}
+import org.scalatest.junit.JUnitSuite
+
+class PermissionTypeTest extends JUnitSuite {
+
+  @Test
+  def testFromString(): Unit = {
+    val permissionType = PermissionType.fromString("Allow")
+    Assert.assertEquals(Allow, permissionType)
+
+    try {
+      PermissionType.fromString("badName")
+      fail("Expected exception on invalid PermissionType name.")
+    } catch {
+      case e: KafkaException => "Expected."
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
new file mode 100644
index 0000000..a632d37
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.security.auth
+
+import kafka.common.KafkaException
+import kafka.security.auth.{ResourceType, Topic}
+import org.junit.{Test, Assert}
+import org.scalatest.junit.JUnitSuite
+
+class ResourceTypeTest extends JUnitSuite {
+
+  @Test
+  def testFromString(): Unit = {
+    val resourceType = ResourceType.fromString("Topic")
+    Assert.assertEquals(Topic, resourceType)
+
+    try {
+      ResourceType.fromString("badName")
+      fail("Expected exception on invalid ResourceType name.")
+    } catch {
+      case e: KafkaException => "Expected."
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/689d170a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 3da666f..5b4f2db 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -405,6 +405,8 @@ class KafkaConfigTest {
         case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
 
+        case KafkaConfig.AuthorizerClassNameProp => //ignore string
+
         case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.HostNameProp => // ignore string
         case KafkaConfig.AdvertisedHostNameProp => //ignore string


Mime
View raw message