kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2869; Host used by Authorizer should be IP address not hostname/IP
Date Fri, 20 Nov 2015 16:28:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 07e214130 -> 2b3f6d204


KAFKA-2869; Host used by Authorizer should be IP address not hostname/IP

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #567 from ijuma/kafka-2869-host-used-by-authorizer-should-be-ip

(cherry picked from commit d0614f97bc1c45734631770c46ab6a79fc8c8547)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b3f6d20
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b3f6d20
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b3f6d20

Branch: refs/heads/0.9.0
Commit: 2b3f6d204a75d1a9ca84b05a6b6a3388f2f16a81
Parents: 07e2141
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri Nov 20 08:28:20 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Nov 20 08:28:28 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/network/RequestChannel.scala    |  5 ++-
 .../main/scala/kafka/network/SocketServer.scala |  7 ++--
 .../security/auth/SimpleAclAuthorizer.scala     |  4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 12 +++---
 .../security/auth/SimpleAclAuthorizerTest.scala | 41 ++++++++++----------
 5 files changed, 36 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2b3f6d20/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 9ea4079..4044f62 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -17,6 +17,7 @@
 
 package kafka.network
 
+import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.security.Principal
 import java.util.concurrent._
@@ -35,7 +36,7 @@ import org.apache.log4j.Logger
 
 
 object RequestChannel extends Logging {
-  val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS,
""), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)
+  val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS,
InetAddress.getLocalHost()), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol
= SecurityProtocol.PLAINTEXT)
 
   def getShutdownReceive() = {
     val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition,
ByteBufferMessageSet]())
@@ -46,7 +47,7 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Session(principal: KafkaPrincipal, host: String)
+  case class Session(principal: KafkaPrincipal, clientAddress: InetAddress)
 
   case class Request(processor: Int, connectionId: String, session: Session, private var
buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
     // These need to be volatile because the readers are in the network thread and the writers
are in the request

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b3f6d20/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index cb38153..69a9569 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -421,7 +421,8 @@ private[kafka] class Processor(val id: Int,
         selector.completedReceives.asScala.foreach { receive =>
           try {
             val channel = selector.channel(receive.source)
-            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
channel.principal().getName), channel.socketDescription)
+            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
channel.principal.getName),
+              channel.socketAddress)
             val req = RequestChannel.Request(processor = id, connectionId = receive.source,
session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol
= protocol)
             requestChannel.sendRequest(req)
           } catch {
@@ -434,11 +435,11 @@ private[kafka] class Processor(val id: Int,
         }
 
         selector.completedSends.asScala.foreach { send =>
-          val resp = inflightResponses.remove(send.destination()).getOrElse {
+          val resp = inflightResponses.remove(send.destination).getOrElse {
             throw new IllegalStateException(s"Send for ${send.destination} completed, but
not in `inflightResponses`")
           }
           resp.request.updateRequestMetrics()
-          selector.unmute(send.destination())
+          selector.unmute(send.destination)
         }
 
         selector.disconnected.asScala.foreach { connectionId =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b3f6d20/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 76645da..cae8f2a 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -110,7 +110,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean
= {
     val principal: KafkaPrincipal = session.principal
-    val host = session.host
+    val host = session.clientAddress.getHostAddress
     val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
 
     //check if there is any Deny acl match that would disallow this operation.
@@ -286,4 +286,4 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b3f6d20/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bb50e40..ade879b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -791,12 +791,12 @@ class KafkaApis(val requestChannel: RequestChannel,
       val protocols = joinGroupRequest.groupProtocols().map(protocol =>
         (protocol.name, Utils.toArray(protocol.metadata))).toList
       coordinator.handleJoinGroup(
-        joinGroupRequest.groupId(),
-        joinGroupRequest.memberId(),
-        request.header.clientId(),
-        request.session.host,
-        joinGroupRequest.sessionTimeout(),
-        joinGroupRequest.protocolType(),
+        joinGroupRequest.groupId,
+        joinGroupRequest.memberId,
+        request.header.clientId,
+        request.session.clientAddress.toString,
+        joinGroupRequest.sessionTimeout,
+        joinGroupRequest.protocolType,
         protocols,
         sendResponseCallback)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b3f6d20/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 76a768a..a4f61df 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -16,12 +16,13 @@
  */
 package kafka.security.auth
 
+import java.net.InetAddress
 import java.util.UUID
 
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Acl.WildCardHost
 import kafka.server.KafkaConfig
-import kafka.utils.{ZkUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert._
@@ -29,10 +30,10 @@ import org.junit.{Before, Test}
 
 class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
-  var simpleAclAuthorizer = new SimpleAclAuthorizer
+  val simpleAclAuthorizer = new SimpleAclAuthorizer
   val testPrincipal = Acl.WildCardPrincipal
-  val testHostName = "test.host.com"
-  var session = new Session(testPrincipal, testHostName)
+  val testHostName = InetAddress.getByName("192.168.0.1")
+  val session = new Session(testPrincipal, testHostName)
   var resource: Resource = null
   val superUsers = "User:superuser1; User:superuser2"
   val username = "alice"
@@ -55,18 +56,18 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
     val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
-    val host1 = "host1"
-    val host2 = "host2"
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val host2 = InetAddress.getByName("192.168.1.2")
 
     //user1 has READ access from host1 and host2.
-    val acl1 = new Acl(user1, Allow, host1, Read)
-    val acl2 = new Acl(user1, Allow, host2, Read)
+    val acl1 = new Acl(user1, Allow, host1.getHostAddress, Read)
+    val acl2 = new Acl(user1, Allow, host2.getHostAddress, Read)
 
     //user1 does not have  READ access from host1.
-    val acl3 = new Acl(user1, Deny, host1, Read)
+    val acl3 = new Acl(user1, Deny, host1.getHostAddress, Read)
 
     //user1 has Write access from host1 only.
-    val acl4 = new Acl(user1, Allow, host1, Write)
+    val acl4 = new Acl(user1, Allow, host1.getHostAddress, Write)
 
     //user1 has DESCRIBE access from all hosts.
     val acl5 = new Acl(user1, Allow, WildCardHost, Describe)
@@ -105,11 +106,11 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   @Test
   def testDenyTakesPrecedence() {
     val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host = "random-host"
+    val host = InetAddress.getByName("192.168.2.1")
     val session = new Session(user, host)
 
     val allowAll = Acl.AllowAllAcl
-    val denyAcl = new Acl(user, Deny, host, All)
+    val denyAcl = new Acl(user, Deny, host.getHostAddress, All)
     val acls = Set[Acl](allowAll, denyAcl)
 
     changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
@@ -123,7 +124,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
     changeAclAndVerify(Set.empty[Acl], Set[Acl](allowAllAcl), Set.empty[Acl])
 
-    val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), "random.host")
+    val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4"))
     assertTrue("allow all acl should allow access to all.", simpleAclAuthorizer.authorize(session,
Read, resource))
   }
 
@@ -133,8 +134,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
     changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
 
-    val session1 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"),
"random.host")
-    val session2 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"),
"random.host")
+    val session1 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"),
InetAddress.getByName("192.0.4.4"))
+    val session2 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"),
InetAddress.getByName("192.0.4.4"))
 
     assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session1,
Read, resource))
     assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session2,
Read, resource))
@@ -145,8 +146,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     assertFalse("when acls = [],  authorizer should fail close.", simpleAclAuthorizer.authorize(session,
Read, resource))
 
     val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host1 = "host1"
-    val readAcl = new Acl(user1, Allow, host1, Read)
+    val host1 = InetAddress.getByName("192.168.3.1")
+    val readAcl = new Acl(user1, Allow, host1.getHostAddress, Read)
     val wildCardResource = new Resource(resource.resourceType, Resource.WildCardResource)
 
     val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource)
@@ -155,11 +156,11 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     assertTrue("User1 should have Read access from host1", simpleAclAuthorizer.authorize(host1Session,
Read, resource))
 
     //allow Write to specific topic.
-    val writeAcl = new Acl(user1, Allow, host1, Write)
+    val writeAcl = new Acl(user1, Allow, host1.getHostAddress, Write)
     changeAclAndVerify(Set.empty[Acl], Set[Acl](writeAcl), Set.empty[Acl])
 
     //deny Write to wild card topic.
-    val denyWriteOnWildCardResourceAcl = new Acl(user1, Deny, host1, Write)
+    val denyWriteOnWildCardResourceAcl = new Acl(user1, Deny, host1.getHostAddress, Write)
     changeAclAndVerify(acls, Set[Acl](denyWriteOnWildCardResourceAcl), Set.empty[Acl], wildCardResource)
 
     assertFalse("User1 should not have Write access from host1", simpleAclAuthorizer.authorize(host1Session,
Write, resource))
@@ -265,4 +266,4 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
     acls
   }
-}
\ No newline at end of file
+}


Mime
View raw message