kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2582; ConsumerMetdata authorization error not returned to user
Date Fri, 25 Sep 2015 22:40:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 10d5deac0 -> ad120d578


KAFKA-2582; ConsumerMetdata authorization error not returned to user

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #245 from hachikuji/KAFKA-2582


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ad120d57
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ad120d57
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ad120d57

Branch: refs/heads/trunk
Commit: ad120d5789306209ab8d110fcbd86d453d01641b
Parents: 10d5dea
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Sep 25 15:40:38 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Sep 25 15:40:38 2015 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 32 +++++++++++---------
 1 file changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ad120d57/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 78bc576..72f3044 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -661,26 +661,28 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleConsumerMetadataRequest(request: RequestChannel.Request) {
     val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest]
 
-    val partition = coordinator.partitionFor(consumerMetadataRequest.group)
-
-    if (!authorize(request.session, Read, new Resource(ConsumerGroup, consumerMetadataRequest.group)))
-      throw new AuthorizationException("Request " + consumerMetadataRequest + " is not authorized
to read from consumer group " + consumerMetadataRequest.group)
+    if (!authorize(request.session, Read, new Resource(ConsumerGroup, consumerMetadataRequest.group)))
{
+      val response = ConsumerMetadataResponse(None, ErrorMapping.AuthorizationCode, consumerMetadataRequest.correlationId)
+      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId,
response)))
+    } else {
+      val partition = coordinator.partitionFor(consumerMetadataRequest.group)
 
-    //get metadata (and create the topic if necessary)
-    val offsetsTopicMetadata = getTopicMetadata(Set(ConsumerCoordinator.OffsetsTopicName),
request.securityProtocol).head
+      //get metadata (and create the topic if necessary)
+      val offsetsTopicMetadata = getTopicMetadata(Set(ConsumerCoordinator.OffsetsTopicName),
request.securityProtocol).head
 
-    val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode,
consumerMetadataRequest.correlationId)
+      val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode,
consumerMetadataRequest.correlationId)
 
-    val response =
-      offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata
=>
-        partitionMetadata.leader.map { leader =>
-          ConsumerMetadataResponse(Some(leader), ErrorMapping.NoError, consumerMetadataRequest.correlationId)
+      val response =
+        offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata
=>
+          partitionMetadata.leader.map { leader =>
+            ConsumerMetadataResponse(Some(leader), ErrorMapping.NoError, consumerMetadataRequest.correlationId)
+          }.getOrElse(errorResponse)
         }.getOrElse(errorResponse)
-      }.getOrElse(errorResponse)
 
-    trace("Sending consumer metadata %s for correlation id %d to client %s."
-          .format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId))
-    requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId,
response)))
+      trace("Sending consumer metadata %s for correlation id %d to client %s."
+        .format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId))
+      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId,
response)))
+    }
   }
 
   def handleJoinGroupRequest(request: RequestChannel.Request) {


Mime
View raw message