kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [1/2] kafka git commit: KAFKA-2691: Improve handling of authorization failure during metadata refresh
Date Wed, 04 Nov 2015 19:02:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c30ee50d8 -> c39e79bb5


http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e363e27..c8ca2a3 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -10,15 +10,14 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package integration.kafka.api
+package kafka.api
 
 import java.io.{DataInputStream, DataOutputStream}
 import java.net.Socket
 import java.nio.ByteBuffer
 import java.util.concurrent.ExecutionException
-import java.util.{ArrayList, Properties}
+import java.util.{ArrayList, Collections, Properties}
 
-import kafka.api.RequestKeys
 import kafka.cluster.EndPoint
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.coordinator.GroupCoordinator
@@ -26,15 +25,13 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.security.auth._
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{ApiException, AuthorizationException, TimeoutException}
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
+import org.apache.kafka.common.errors._
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.{TopicPartition, requests}
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
 
@@ -76,42 +73,40 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
 
   val endPoint = new EndPoint("localhost", 0, SecurityProtocol.PLAINTEXT)
   
-  var RequestKeyToRequest: mutable.LinkedHashMap[Short, AbstractRequest] = null
-
   val RequestKeyToResponseDeserializer: Map[Short, Class[_ <: Any]] =
-    Map(RequestKeys.MetadataKey -> classOf[MetadataResponse],
-      RequestKeys.ProduceKey -> classOf[ProduceResponse],
-      RequestKeys.FetchKey -> classOf[FetchResponse],
-      RequestKeys.OffsetsKey -> classOf[ListOffsetResponse],
-      RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse],
-      RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse],
-      RequestKeys.GroupCoordinatorKey -> classOf[GroupCoordinatorResponse],
-      RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse],
+    Map(RequestKeys.MetadataKey -> classOf[requests.MetadataResponse],
+      RequestKeys.ProduceKey -> classOf[requests.ProduceResponse],
+      RequestKeys.FetchKey -> classOf[requests.FetchResponse],
+      RequestKeys.OffsetsKey -> classOf[requests.ListOffsetResponse],
+      RequestKeys.OffsetCommitKey -> classOf[requests.OffsetCommitResponse],
+      RequestKeys.OffsetFetchKey -> classOf[requests.OffsetFetchResponse],
+      RequestKeys.GroupCoordinatorKey -> classOf[requests.GroupCoordinatorResponse],
+      RequestKeys.UpdateMetadataKey -> classOf[requests.UpdateMetadataResponse],
       RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse],
       RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse],
       RequestKeys.HeartbeatKey -> classOf[HeartbeatResponse],
       RequestKeys.LeaveGroupKey -> classOf[LeaveGroupResponse],
-      RequestKeys.LeaderAndIsrKey -> classOf[LeaderAndIsrResponse],
-      RequestKeys.StopReplicaKey -> classOf[StopReplicaResponse],
-      RequestKeys.ControlledShutdownKey -> classOf[ControlledShutdownResponse]
+      RequestKeys.LeaderAndIsrKey -> classOf[requests.LeaderAndIsrResponse],
+      RequestKeys.StopReplicaKey -> classOf[requests.StopReplicaResponse],
+      RequestKeys.ControlledShutdownKey -> classOf[requests.ControlledShutdownResponse]
     )
 
   val RequestKeyToErrorCode = Map[Short, (Nothing) => Short](
-    RequestKeys.MetadataKey -> ((resp: MetadataResponse) => resp.errors().asScala.find(_._1
== topic).getOrElse(("test", Errors.NONE))._2.code()),
-    RequestKeys.ProduceKey -> ((resp: ProduceResponse) => resp.responses().asScala.find(_._1
== tp).get._2.errorCode),
-    RequestKeys.FetchKey -> ((resp: FetchResponse) => resp.responseData().asScala.find(_._1
== tp).get._2.errorCode),
-    RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1
== tp).get._2.errorCode),
-    RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1
== tp).get._2),
-    RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1
== tp).get._2.errorCode),
-    RequestKeys.GroupCoordinatorKey -> ((resp: GroupCoordinatorResponse) => resp.errorCode()),
-    RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()),
+    RequestKeys.MetadataKey -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1
== topic).getOrElse(("test", Errors.NONE))._2.code()),
+    RequestKeys.ProduceKey -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1
== tp).get._2.errorCode),
+    RequestKeys.FetchKey -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1
== tp).get._2.errorCode),
+    RequestKeys.OffsetsKey -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1
== tp).get._2.errorCode),
+    RequestKeys.OffsetCommitKey -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1
== tp).get._2),
+    RequestKeys.OffsetFetchKey -> ((resp: requests.OffsetFetchResponse) => resp.responseData().asScala.find(_._1
== tp).get._2.errorCode),
+    RequestKeys.GroupCoordinatorKey -> ((resp: requests.GroupCoordinatorResponse) =>
resp.errorCode()),
+    RequestKeys.UpdateMetadataKey -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()),
     RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()),
     RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()),
     RequestKeys.HeartbeatKey -> ((resp: HeartbeatResponse) => resp.errorCode()),
     RequestKeys.LeaveGroupKey -> ((resp: LeaveGroupResponse) => resp.errorCode()),
-    RequestKeys.LeaderAndIsrKey -> ((resp: LeaderAndIsrResponse) => resp.responses().asScala.find(_._1
== tp).get._2),
-    RequestKeys.StopReplicaKey -> ((resp: StopReplicaResponse) => resp.responses().asScala.find(_._1
== tp).get._2),
-    RequestKeys.ControlledShutdownKey -> ((resp: ControlledShutdownResponse) => resp.errorCode())
+    RequestKeys.LeaderAndIsrKey -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1
== tp).get._2),
+    RequestKeys.StopReplicaKey -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1
== tp).get._2),
+    RequestKeys.ControlledShutdownKey -> ((resp: requests.ControlledShutdownResponse)
=> resp.errorCode())
   )
 
   val RequestKeysToAcls = Map[Short, Map[Resource, Set[Acl]]](
@@ -155,41 +150,6 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       servers.head.consumerCoordinator.offsetsTopicConfigs)
     // create the test topic with all the brokers as replicas
     TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers)
-
-    val joinReq = new JoinGroupRequest(group, 30000, JoinGroupRequest.UNKNOWN_MEMBER_ID,
"consumer",
-      List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
-
-    //we have to get a join call so the group is created and we get back a memberId
-    addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
-    val socket = new Socket("localhost", servers.head.boundPort())
-    val joinResponse = sendRequestAndVerifyResponseErrorCode(socket, RequestKeys.JoinGroupKey,
joinReq, ErrorMapping.NoError).asInstanceOf[JoinGroupResponse]
-    val memberId = joinResponse.memberId()
-
-    //remove group acls
-    removeAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
-
-    RequestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest](
-      RequestKeys.MetadataKey -> new MetadataRequest(List(topic).asJava),
-      RequestKeys.ProduceKey -> new ProduceRequest(1, 5000, collection.mutable.Map(tp
-> ByteBuffer.wrap("test".getBytes)).asJava),
-      RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0,
100)).asJava),
-      RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0,
100)).asJava),
-      RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava),
-      RequestKeys.GroupCoordinatorKey -> new GroupCoordinatorRequest(group),
-      RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue,
-        Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava,
2, Set(brokerId).asJava)).asJava,
-        Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT ->
new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava),
-      RequestKeys.JoinGroupKey -> new JoinGroupRequest(group, 30000, memberId, "consumer",
-        List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava),
-      RequestKeys.SyncGroupKey -> new SyncGroupRequest(group, 1, memberId, Map(memberId
-> ByteBuffer.wrap("test".getBytes())).asJava),
-      RequestKeys.OffsetCommitKey -> new OffsetCommitRequest(group, 1, memberId, 1000,
Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava),
-      RequestKeys.HeartbeatKey -> new HeartbeatRequest(group, 1, memberId),
-      RequestKeys.LeaveGroupKey -> new LeaveGroupRequest(group, memberId),
-      RequestKeys.LeaderAndIsrKey -> new LeaderAndIsrRequest(brokerId, Int.MaxValue,
-        Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue,
List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
-        Set(new LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava),
-      RequestKeys.StopReplicaKey -> new StopReplicaRequest(brokerId, Int.MaxValue, true,
Set(tp).asJava),
-      RequestKeys.ControlledShutdownKey -> new ControlledShutdownRequest(brokerId)
-    )
   }
 
   @After
@@ -198,108 +158,346 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     super.tearDown()
   }
 
+  private def createMetadataRequest = {
+    new requests.MetadataRequest(List(topic).asJava)
+  }
+
+  private def createProduceRequest = {
+    new requests.ProduceRequest(1, 5000, collection.mutable.Map(tp -> ByteBuffer.wrap("test".getBytes)).asJava)
+  }
+
+  private def createFetchRequest = {
+    new requests.FetchRequest(5000, 100, Map(tp -> new requests.FetchRequest.PartitionData(0,
100)).asJava)
+  }
+
+  private def createListOffsetsRequest = {
+    new requests.ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava)
+  }
+
+  private def createOffsetFetchRequest = {
+    new requests.OffsetFetchRequest(group, List(tp).asJava)
+  }
+
+  private def createGroupCoordinatorRequest = {
+    new requests.GroupCoordinatorRequest(group)
+  }
+
+  private def createUpdateMetadataRequest = {
+    val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue,
brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
+    val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
+      Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost",
0)).asJava)).asJava
+    new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers)
+  }
+
+  private def createJoinGroupRequest = {
+    new JoinGroupRequest(group, 30000, "", "consumer",
+      List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
+  }
+
+  private def createSyncGroupRequest = {
+    new SyncGroupRequest(group, 1, "", Map[String, ByteBuffer]().asJava)
+  }
+
+  private def createOffsetCommitRequest = {
+    new requests.OffsetCommitRequest(group, 1, "", 1000, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0,
"metadata")).asJava)
+  }
+
+  private def createHeartbeatRequest = {
+    new HeartbeatRequest(group, 1, "")
+  }
+
+  private def createLeaveGroupRequest = {
+    new LeaveGroupRequest(group, "")
+  }
+
+  private def createLeaderAndIsrRequest = {
+    new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue,
+      Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId,
Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+      Set(new requests.LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava)
+  }
+
+  private def createStopReplicaRequest = {
+    new requests.StopReplicaRequest(brokerId, Int.MaxValue, true, Set(tp).asJava)
+  }
+
+  private def createControlledShutdownRequest = {
+    new requests.ControlledShutdownRequest(brokerId)
+  }
+
   @Test
   def testAuthorization() {
+    val requestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest](
+      RequestKeys.MetadataKey -> createMetadataRequest,
+      RequestKeys.ProduceKey -> createProduceRequest,
+      RequestKeys.FetchKey -> createFetchRequest,
+      RequestKeys.OffsetsKey -> createListOffsetsRequest,
+      RequestKeys.OffsetFetchKey -> createOffsetFetchRequest,
+      RequestKeys.GroupCoordinatorKey -> createGroupCoordinatorRequest,
+      RequestKeys.UpdateMetadataKey -> createUpdateMetadataRequest,
+      RequestKeys.JoinGroupKey -> createJoinGroupRequest,
+      RequestKeys.SyncGroupKey -> createSyncGroupRequest,
+      RequestKeys.OffsetCommitKey -> createOffsetCommitRequest,
+      RequestKeys.HeartbeatKey -> createHeartbeatRequest,
+      RequestKeys.LeaveGroupKey -> createLeaveGroupRequest,
+      RequestKeys.LeaderAndIsrKey -> createLeaderAndIsrRequest,
+      RequestKeys.StopReplicaKey -> createStopReplicaRequest,
+      RequestKeys.ControlledShutdownKey -> createControlledShutdownRequest
+    )
+
     val socket = new Socket("localhost", servers.head.boundPort())
 
-    for ((key, request) <- RequestKeyToRequest) {
+    for ((key, request) <- requestKeyToRequest) {
       removeAllAcls
-
-      sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.AuthorizationCode)
-
+      val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
+      sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized
= false)
       for ((resource, acls) <- RequestKeysToAcls(key))
         addAndVerifyAcls(acls, resource)
+      sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized
= true)
+    }
+  }
 
-      sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.NoError)
+  @Test
+  def testProduceWithNoTopicAccess() {
+    try {
+      sendRecords(numRecords, tp)
+      fail("sendRecords should have thrown")
+    } catch {
+      case e: TopicAuthorizationException =>
+        assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
     }
   }
 
-    @Test
-    def testProduceNeedsAuthorization() {
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)),
topicResource)
-      try {
-        sendRecords(numRecords, tp)
-        Assert.fail("should have thrown exception")
-      } catch {
-        case e: ApiException => Assert.assertEquals(Errors.AUTHORIZATION_FAILED.exception().getMessage,
e.getMessage)
-      }
+  @Test
+  def testProduceWithTopicDescribe() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)),
topicResource)
+    try {
+      sendRecords(numRecords, tp)
+      fail("sendRecords should have thrown")
+    } catch {
+      case e: TopicAuthorizationException =>
+        assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
     }
+  }
 
-    @Test
-    def testOnlyWritePermissionAllowsWritingToProducer() {
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+  @Test
+  def testProduceWithTopicRead() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
topicResource)
+    try {
       sendRecords(numRecords, tp)
+      fail("sendRecords should have thrown")
+    } catch {
+      case e: TopicAuthorizationException =>
+        assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
     }
+  }
 
-    @Test
-    def testCreatePermissionNeededForWritingToNonExistentTopic() {
-      val newTopic = "newTopic"
-      val topicPartition = new TopicPartition(newTopic, 0)
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
new Resource(Topic, newTopic))
-      try {
-        sendRecords(numRecords, topicPartition)
-        Assert.fail("should have thrown exception")
-      } catch {
-        case e: TimeoutException =>
-        //TODO Need to update the producer so it actually throws the server side of exception.
-        case e: Exception => Assert.fail(s"Only timeout exception should be thrown but
$e thrown")
-      }
+  @Test
+  def testProduceWithTopicWrite() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+    sendRecords(numRecords, tp)
+  }
 
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create),
-        new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+  @Test
+  def testCreatePermissionNeededForWritingToNonExistentTopic() {
+    val newTopic = "newTopic"
+    val topicPartition = new TopicPartition(newTopic, 0)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
new Resource(Topic, newTopic))
+    try {
       sendRecords(numRecords, topicPartition)
+      Assert.fail("should have thrown exception")
+    } catch {
+      case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic),
e.unauthorizedTopics())
     }
 
-    @Test
-    def testConsumerNeedsAuthorization() {
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
-      //TODO: Ideally we would want to test that when consumerGroup permission is not present
we still get an AuthorizationException
-      //but the consumer fetcher currently waits forever for the consumer metadata to become
available.
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
-      sendRecords(1, tp)
-      try {
-        this.consumers.head.assign(List(tp).asJava)
-        consumeRecords(this.consumers.head)
-        Assert.fail("should have thrown exception")
-      } catch {
-        case e: AuthorizationException => Assert.assertEquals("Not authorized to read
from topic-0", e.getMessage)
-      }
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)),
Resource.ClusterResource)
+    sendRecords(numRecords, topicPartition)
+  }
+
+  @Test(expected = classOf[AuthorizationException])
+  def testConsumeWithNoAccess(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+    this.consumers.head.assign(List(tp).asJava)
+    consumeRecords(this.consumers.head)
+  }
+
+  @Test
+  def testConsumeWithNoGroupAccess(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
topicResource)
+    try {
+      this.consumers.head.assign(List(tp).asJava)
+      consumeRecords(this.consumers.head)
+      Assert.fail("should have thrown exception")
+    } catch {
+      case e: GroupAuthorizationException => assertEquals(group, e.groupId())
+    }
+  }
+
+  @Test
+  def testConsumeWithNoTopicAccess() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    try {
+      this.consumers.head.assign(List(tp).asJava)
+      consumeRecords(this.consumers.head)
+      Assert.fail("should have thrown exception")
+    } catch {
+      case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic),
e.unauthorizedTopics());
+    }
+  }
+
+  @Test
+  def testConsumeWithTopicDescribe() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)),
topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    try {
+      this.consumers.head.assign(List(tp).asJava)
+      consumeRecords(this.consumers.head)
+      Assert.fail("should have thrown exception")
+    } catch {
+      case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic),
e.unauthorizedTopics());
     }
+  }
 
-    @Test
-    def testAllowingReadOnTopicAndGroupAllowsReading() {
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
topicResource)
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
-      sendRecords(1, tp)
+  @Test
+  def testConsumeWithTopicWrite() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    try {
       this.consumers.head.assign(List(tp).asJava)
       consumeRecords(this.consumers.head)
+      Assert.fail("should have thrown exception")
+    } catch {
+      case e: TopicAuthorizationException =>
+        assertEquals(Collections.singleton(topic), e.unauthorizedTopics());
+    }
+  }
+
+  @Test
+  def testConsumeWithTopicAndGroupRead() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    this.consumers.head.assign(List(tp).asJava)
+    consumeRecords(this.consumers.head)
+  }
+
+  @Test
+  def testCreatePermissionNeededToReadFromNonExistentTopic() {
+    val newTopic = "newTopic"
+    val topicPartition = new TopicPartition(newTopic, 0)
+    val newTopicResource = new Resource(Topic, newTopic)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
newTopicResource)
+    addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
+    addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource)
+    try {
+      this.consumers(0).assign(List(topicPartition).asJava)
+      consumeRecords(this.consumers(0))
+      Assert.fail("should have thrown exception")
+    } catch {
+      case e: TopicAuthorizationException =>
+        assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics());
     }
 
-//    TODO: The following test goes into an infinite loop as consumer waits for consumer
metadata to be propogated for ever.
-//    @Test
-//    def testCreatePermissionNeededToReadFromNonExistentTopic() {
-//      val newTopic = "newTopic"
-//      val topicPartition = new TopicPartition(newTopic, 0)
-//      val newTopicResource = new Resource(Topic, newTopic)
-//      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
newTopicResource)
-//      addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
-//      addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource)
-//      try {
-//        this.consumers(0).assign(List(topicPartition).asJava)
-//        consumeRecords(this.consumers(0))
-//        Assert.fail("should have thrown exception")
-//      } catch {
-//        //checking for the message and type to ensure whenever these things are fixed on
client side the test starts failing.
-//        case e: ApiException => Assert.assertEquals(e.getMessage, "Request is not authorized.")
-//      }
-//
-//      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
newTopicResource)
-//      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)),
Resource.ClusterResource)
-//
-//      sendRecords(numRecords, topicPartition)
-//      consumeRecords(this.consumers(0))
-//    }
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
newTopicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)),
Resource.ClusterResource)
+
+    sendRecords(numRecords, topicPartition)
+    consumeRecords(this.consumers(0), topic = newTopic, part = 0)
+  }
+
+  @Test(expected = classOf[AuthorizationException])
+  def testCommitWithNoAccess() {
+    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+  }
+
+  @Test(expected = classOf[TopicAuthorizationException])
+  def testCommitWithNoTopicAccess() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+  }
+
+  @Test(expected = classOf[TopicAuthorizationException])
+  def testCommitWithTopicWrite() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+  }
+
+  @Test(expected = classOf[TopicAuthorizationException])
+  def testCommitWithTopicDescribe() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)),
topicResource)
+    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+  }
+
+  @Test(expected = classOf[GroupAuthorizationException])
+  def testCommitWithNoGroupAccess() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
topicResource)
+    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+  }
+
+  @Test
+  def testCommitWithTopicAndGroupRead() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
topicResource)
+    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+  }
+
+  @Test(expected = classOf[AuthorizationException])
+  def testOffsetFetchWithNoAccess() {
+    this.consumers.head.assign(List(tp).asJava)
+    this.consumers.head.position(tp)
+  }
+
+  @Test(expected = classOf[GroupAuthorizationException])
+  def testOffsetFetchWithNoGroupAccess() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
topicResource)
+    this.consumers.head.assign(List(tp).asJava)
+    this.consumers.head.position(tp)
+  }
+
+  @Test(expected = classOf[TopicAuthorizationException])
+  def testOffsetFetchWithNoTopicAccess() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    this.consumers.head.assign(List(tp).asJava)
+    this.consumers.head.position(tp)
+  }
+
+  @Test
+  def testOffsetFetchTopicDescribe() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)),
topicResource)
+    this.consumers.head.assign(List(tp).asJava)
+    this.consumers.head.position(tp)
+  }
+
+  @Test
+  def testOffsetFetchWithTopicAndGroupRead() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),
topicResource)
+    this.consumers.head.assign(List(tp).asJava)
+    this.consumers.head.position(tp)
+  }
 
   def removeAllAcls() = {
     servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
@@ -308,7 +506,11 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     }
   }
 
-  def sendRequestAndVerifyResponseErrorCode(socket: Socket, key: Short, request: AbstractRequest,
expectedErrorCode: Short): AbstractRequestResponse = {
+  def sendRequestAndVerifyResponseErrorCode(socket: Socket,
+                                            key: Short,
+                                            request: AbstractRequest,
+                                            resources: Set[ResourceType],
+                                            isAuthorized: Boolean): AbstractRequestResponse
= {
     val header = new RequestHeader(key, "client", 1)
     val body = request.toStruct
 
@@ -323,7 +525,14 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     ResponseHeader.parse(resp)
 
     val response = RequestKeyToResponseDeserializer(key).getMethod("parse", classOf[ByteBuffer]).invoke(null,
resp).asInstanceOf[AbstractRequestResponse]
-    Assert.assertEquals(s"$key failed", expectedErrorCode, RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse)
=> Short](response))
+    val errorCode = RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse) =>
Short](response)
+
+    val possibleErrorCodes = resources.map(_.errorCode)
+    if (isAuthorized)
+      assertFalse(s"${ApiKeys.forId(key)} should be allowed", possibleErrorCodes.contains(errorCode))
+    else
+      assertTrue(s"${ApiKeys.forId(key)} should be forbidden", possibleErrorCodes.contains(errorCode))
+
     response
   }
 
@@ -364,8 +573,11 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   }
 
 
-  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int
= 1, startingOffset: Int =
-  0) {
+  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
+                             numRecords: Int = 1,
+                             startingOffset: Int = 0,
+                             topic: String = topic,
+                             part: Int = part) {
     val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
     val maxIters = numRecords * 50
     var iters = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index d43fc53..2a5ca9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -25,6 +25,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +43,7 @@ public class DefaultPartitionGrouperTest {
             new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos);
+    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
 
     @Test
     public void testGrouping() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index d8141d1..909df13 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -77,7 +77,7 @@ public class StreamThreadTest {
             new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos);
+    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
 
     PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1",
"topic2", "topic3"));
 


Mime
View raw message