kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2579; prevent unauthorized clients from joining groups
Date Fri, 25 Sep 2015 02:26:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 18d370124 -> fe798641a


KAFKA-2579; prevent unauthorized clients from joining groups

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Gwen Shapira

Closes #240 from hachikuji/KAFKA-2579


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe798641
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe798641
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe798641

Branch: refs/heads/trunk
Commit: fe798641a2ba5e60940fa91f5f01a14bcf98cf3e
Parents: 18d3701
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Sep 24 19:26:07 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Sep 24 19:26:07 2015 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 38 ++++++++++----------
 1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fe798641/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2b9531b..78bc576 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -689,35 +689,35 @@ class KafkaApis(val requestChannel: RequestChannel,
     val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
     val respHeader = new ResponseHeader(request.header.correlationId)
 
-    val (authorizedTopics, unauthorizedTopics) = joinGroupRequest.topics().partition { topic
=>
-      authorize(request.session, Read, new Resource(Topic, topic)) &&
-        authorize(request.session, Read, new Resource(ConsumerGroup, joinGroupRequest.groupId()))
-    }
-
     // the callback for sending a join-group response
     def sendResponseCallback(partitions: Set[TopicAndPartition], consumerId: String, generationId:
Int, errorCode: Short) {
-      val error = if (errorCode == ErrorMapping.NoError && unauthorizedTopics.nonEmpty)
ErrorMapping.AuthorizationCode else errorCode
-
-      val partitionList = if (error == ErrorMapping.NoError)
+      val partitionList = if (errorCode == ErrorMapping.NoError)
         partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer
       else
         List.empty.toBuffer
 
-      val responseBody = new JoinGroupResponse(error, generationId, consumerId, partitionList)
+      val responseBody = new JoinGroupResponse(errorCode, generationId, consumerId, partitionList)
 
       trace("Sending join group response %s for correlation id %d to client %s."
-              .format(responseBody, request.header.correlationId, request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId,
respHeader, responseBody)))
+        .format(responseBody, request.header.correlationId, request.header.clientId))
+      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId,
respHeader, responseBody)))
     }
 
-    // let the coordinator to handle join-group
-    coordinator.handleJoinGroup(
-      joinGroupRequest.groupId(),
-      joinGroupRequest.consumerId(),
-      authorizedTopics.toSet,
-      joinGroupRequest.sessionTimeout(),
-      joinGroupRequest.strategy(),
-      sendResponseCallback)
+    // ensure that the client is authorized to join the group and read from all subscribed
topics
+    if (!authorize(request.session, Read, new Resource(ConsumerGroup, joinGroupRequest.groupId()))
||
+        joinGroupRequest.topics().exists(topic => !authorize(request.session, Read, new
Resource(Topic, topic)))) {
+      val responseBody = new JoinGroupResponse(ErrorMapping.AuthorizationCode, 0, joinGroupRequest.consumerId(),
List.empty[TopicPartition])
+      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId,
respHeader, responseBody)))
+    } else {
+      // let the coordinator to handle join-group
+      coordinator.handleJoinGroup(
+        joinGroupRequest.groupId(),
+        joinGroupRequest.consumerId(),
+        joinGroupRequest.topics().toSet,
+        joinGroupRequest.sessionTimeout(),
+        joinGroupRequest.strategy(),
+        sendResponseCallback)
+    }
   }
 
   def handleHeartbeatRequest(request: RequestChannel.Request) {


Mime
View raw message