kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6750: Add listener name to authentication context (KIP-282) (#4829)
Date Tue, 05 Jun 2018 12:23:43 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a166f8  KAFKA-6750: Add listener name to authentication context (KIP-282) (#4829)
8a166f8 is described below

commit 8a166f8c28cf046cba0c596d781f1daeca795ccd
Author: Mickael Maison <mimaison@users.noreply.github.com>
AuthorDate: Tue Jun 5 13:23:10 2018 +0100

    KAFKA-6750: Add listener name to authentication context (KIP-282) (#4829)
    
    PrincipalBuilder implementations can now take the listener into account
    when creating the Principal. This is especially interesting in deployments
    where inter-broker traffic is on a different listener than client traffic or
    when the same protocol is used by multiple listeners.
    
    The change in itself is mostly "plumbing" as the listener name needs to be
    passed from ChannelBuilders all the way down to all classes implementing
    AuthenticationContext.
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
    
    Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
    Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
---
 .../kafka/common/network/ChannelBuilders.java      |  2 +-
 .../common/network/PlaintextChannelBuilder.java    | 20 +++++++++++++++++---
 .../kafka/common/network/SslChannelBuilder.java    | 14 +++++++++++---
 .../security/auth/AuthenticationContext.java       |  9 ++++++++-
 .../auth/PlaintextAuthenticationContext.java       |  9 ++++++++-
 .../security/auth/SaslAuthenticationContext.java   | 10 +++++++++-
 .../security/auth/SslAuthenticationContext.java    |  9 ++++++++-
 .../authenticator/SaslServerAuthenticator.java     |  2 +-
 .../kafka/common/network/ChannelBuildersTest.java  |  4 ++--
 .../apache/kafka/common/network/SelectorTest.java  |  4 ++--
 .../common/network/SslTransportLayerTest.java      | 11 ++++-------
 .../auth/DefaultKafkaPrincipalBuilderTest.java     | 22 ++++++++++++++--------
 .../kafka/api/EndToEndAuthorizationTest.scala      |  4 ++--
 .../api/PlaintextEndToEndAuthorizationTest.scala   | 19 ++++++++++++++++++-
 14 files changed, 105 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 078d844..1a64396 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -131,7 +131,7 @@ public class ChannelBuilders {
                         tokenCache);
                 break;
             case PLAINTEXT:
-                channelBuilder = new PlaintextChannelBuilder();
+                channelBuilder = new PlaintextChannelBuilder(listenerName);
                 break;
             default:
                 throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index c0d1059..c7b80cb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -33,8 +33,17 @@ import java.util.Map;
 
 public class PlaintextChannelBuilder implements ChannelBuilder {
     private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class);
+    private final ListenerName listenerName;
     private Map<String, ?> configs;
 
+    /**
+     * Constructs a plaintext channel builder. ListenerName is non-null whenever
+     * it's instantiated in the broker and null otherwise.
+     */
+    public PlaintextChannelBuilder(ListenerName listenerName) {
+        this.listenerName = listenerName;
+    }
+
     public void configure(Map<String, ?> configs) throws KafkaException {
         this.configs = configs;
     }
@@ -43,7 +52,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool
memoryPool) throws KafkaException {
         try {
             PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
-            PlaintextAuthenticator authenticator = new PlaintextAuthenticator(configs, transportLayer);
+            PlaintextAuthenticator authenticator = new PlaintextAuthenticator(configs, transportLayer,
listenerName);
             return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize,
                     memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
@@ -58,10 +67,12 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
     private static class PlaintextAuthenticator implements Authenticator {
         private final PlaintextTransportLayer transportLayer;
         private final KafkaPrincipalBuilder principalBuilder;
+        private final ListenerName listenerName;
 
-        private PlaintextAuthenticator(Map<String, ?> configs, PlaintextTransportLayer
transportLayer) {
+        private PlaintextAuthenticator(Map<String, ?> configs, PlaintextTransportLayer
transportLayer, ListenerName listenerName) {
             this.transportLayer = transportLayer;
             this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer,
this, null);
+            this.listenerName = listenerName;
         }
 
         @Override
@@ -70,7 +81,10 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
         @Override
         public KafkaPrincipal principal() {
             InetAddress clientAddress = transportLayer.socketChannel().socket().getInetAddress();
-            return principalBuilder.build(new PlaintextAuthenticationContext(clientAddress));
+            // listenerName should only be null in Client mode where principal() should not
be called
+            if (listenerName == null)
+                throw new IllegalStateException("Unexpected call to principal() when listenerName
is null");
+            return principalBuilder.build(new PlaintextAuthenticationContext(clientAddress,
listenerName.value()));
         }
 
         @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 941c455..b43e4c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -89,7 +89,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool
memoryPool) throws KafkaException {
         try {
             SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key));
-            Authenticator authenticator = new SslAuthenticator(configs, transportLayer);
+            Authenticator authenticator = new SslAuthenticator(configs, transportLayer, listenerName);
             return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize,
                     memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
@@ -152,10 +152,12 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
     private static class SslAuthenticator implements Authenticator {
         private final SslTransportLayer transportLayer;
         private final KafkaPrincipalBuilder principalBuilder;
+        private final ListenerName listenerName;
 
-        private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer)
{
+        private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer,
ListenerName listenerName) {
             this.transportLayer = transportLayer;
             this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer,
this, null);
+            this.listenerName = listenerName;
         }
         /**
          * No-Op for plaintext authenticator
@@ -170,7 +172,13 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
         @Override
         public KafkaPrincipal principal() {
             InetAddress clientAddress = transportLayer.socketChannel().socket().getInetAddress();
-            SslAuthenticationContext context = new SslAuthenticationContext(transportLayer.sslSession(),
clientAddress);
+            // listenerName should only be null in Client mode where principal() should not
be called
+            if (listenerName == null)
+                throw new IllegalStateException("Unexpected call to principal() when listenerName
is null");
+            SslAuthenticationContext context = new SslAuthenticationContext(
+                    transportLayer.sslSession(),
+                    clientAddress,
+                    listenerName.value());
             return principalBuilder.build(context);
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
index b8c0847..a8abea8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
@@ -18,9 +18,11 @@ package org.apache.kafka.common.security.auth;
 
 import java.net.InetAddress;
 
+
 /**
  * An object representing contextual information from the authentication session. See
- * {@link SaslAuthenticationContext} and {@link SslAuthenticationContext}.
+ * {@link PlaintextAuthenticationContext}, {@link SaslAuthenticationContext}
+ * and {@link SslAuthenticationContext}. This class is only used in the broker.
  */
 public interface AuthenticationContext {
     /**
@@ -32,4 +34,9 @@ public interface AuthenticationContext {
      * Address of the authenticated client
      */
     InetAddress clientAddress();
+
+    /**
+     * Name of the listener used for the connection
+     */
+    String listenerName();
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
b/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
index bc14d36..a111f21 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
@@ -20,9 +20,11 @@ import java.net.InetAddress;
 
 public class PlaintextAuthenticationContext implements AuthenticationContext {
     private final InetAddress clientAddress;
+    private final String listenerName;
 
-    public PlaintextAuthenticationContext(InetAddress clientAddress) {
+    public PlaintextAuthenticationContext(InetAddress clientAddress, String listenerName)
{
         this.clientAddress = clientAddress;
+        this.listenerName = listenerName;
     }
 
     @Override
@@ -35,4 +37,9 @@ public class PlaintextAuthenticationContext implements AuthenticationContext
{
         return clientAddress;
     }
 
+    @Override
+    public String listenerName() {
+        return listenerName;
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
index 89e6063..719d041 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
@@ -17,17 +17,20 @@
 package org.apache.kafka.common.security.auth;
 
 import javax.security.sasl.SaslServer;
+
 import java.net.InetAddress;
 
 public class SaslAuthenticationContext implements AuthenticationContext {
     private final SaslServer server;
     private final SecurityProtocol securityProtocol;
     private final InetAddress clientAddress;
+    private final String listenerName;
 
-    public SaslAuthenticationContext(SaslServer server, SecurityProtocol securityProtocol,
InetAddress clientAddress) {
+    public SaslAuthenticationContext(SaslServer server, SecurityProtocol securityProtocol,
InetAddress clientAddress, String listenerName) {
         this.server = server;
         this.securityProtocol = securityProtocol;
         this.clientAddress = clientAddress;
+        this.listenerName = listenerName;
     }
 
     public SaslServer server() {
@@ -43,4 +46,9 @@ public class SaslAuthenticationContext implements AuthenticationContext
{
     public InetAddress clientAddress() {
         return clientAddress;
     }
+
+    @Override
+    public String listenerName() {
+        return listenerName;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
b/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
index d87a892..88819f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
@@ -22,10 +22,12 @@ import java.net.InetAddress;
 public class SslAuthenticationContext implements AuthenticationContext {
     private final SSLSession session;
     private final InetAddress clientAddress;
+    private final String listenerName;
 
-    public SslAuthenticationContext(SSLSession session, InetAddress clientAddress) {
+    public SslAuthenticationContext(SSLSession session, InetAddress clientAddress, String
listenerName) {
         this.session = session;
         this.clientAddress = clientAddress;
+        this.listenerName = listenerName;
     }
 
     public SSLSession session() {
@@ -41,4 +43,9 @@ public class SslAuthenticationContext implements AuthenticationContext {
     public InetAddress clientAddress() {
         return clientAddress;
     }
+
+    @Override
+    public String listenerName() {
+        return listenerName;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 5140afb..f2cc621 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -282,7 +282,7 @@ public class SaslServerAuthenticator implements Authenticator {
 
     @Override
     public KafkaPrincipal principal() {
-        SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol,
clientAddress());
+        SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol,
clientAddress(), listenerName.value());
         KafkaPrincipal principal = principalBuilder.build(context);
         if (ScramMechanism.isScram(saslMechanism) && Boolean.parseBoolean((String)
saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG))) {
             principal.tokenAuthenticated(true);
diff --git a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
index de210e7..c2b89fe 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.easymock.EasyMock;
 import org.junit.Test;
 
@@ -38,7 +39,6 @@ import static org.junit.Assert.assertTrue;
 public class ChannelBuildersTest {
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testCreateOldPrincipalBuilder() throws Exception {
         TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
         Authenticator authenticator = EasyMock.mock(Authenticator.class);
@@ -51,7 +51,7 @@ public class ChannelBuildersTest {
         assertTrue(OldPrincipalBuilder.configured);
 
         // test delegation
-        KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost()));
+        KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost(),
SecurityProtocol.PLAINTEXT.name()));
         assertEquals(OldPrincipalBuilder.PRINCIPAL_NAME, principal.getName());
         assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index f1c6a5a..8ce8d50 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -80,7 +80,7 @@ public class SelectorTest {
         this.server = new EchoServer(SecurityProtocol.PLAINTEXT, configs);
         this.server.start();
         this.time = new MockTime();
-        this.channelBuilder = new PlaintextChannelBuilder();
+        this.channelBuilder = new PlaintextChannelBuilder(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
         this.channelBuilder.configure(configs);
         this.metrics = new Metrics();
         this.selector = new Selector(5000, this.metrics, time, "MetricGroup", channelBuilder,
new LogContext());
@@ -305,7 +305,7 @@ public class SelectorTest {
 
     @Test
     public void registerFailure() throws Exception {
-        ChannelBuilder channelBuilder = new PlaintextChannelBuilder() {
+        ChannelBuilder channelBuilder = new PlaintextChannelBuilder(null) {
             @Override
             public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
                     MemoryPool memoryPool) throws KafkaException {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index f5af400..1f62c10 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -447,8 +447,7 @@ public class SslTransportLayerTest {
      */
     @Test
     public void testInvalidSecureRandomImplementation() throws Exception {
-        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
-        try {
+        try (SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null,
false)) {
             sslClientConfigs.put(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, "invalid");
             channelBuilder.configure(sslClientConfigs);
             fail("SSL channel configured with invalid SecureRandom implementation");
@@ -462,8 +461,7 @@ public class SslTransportLayerTest {
      */
     @Test
     public void testInvalidTruststorePassword() throws Exception {
-        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
-        try {
+        try (SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null,
false)) {
             sslClientConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
             channelBuilder.configure(sslClientConfigs);
             fail("SSL channel configured with invalid truststore password");
@@ -477,8 +475,7 @@ public class SslTransportLayerTest {
      */
     @Test
     public void testInvalidKeystorePassword() throws Exception {
-        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
-        try {
+        try (SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null,
false)) {
             sslClientConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
             channelBuilder.configure(sslClientConfigs);
             fail("SSL channel configured with invalid keystore password");
@@ -785,7 +782,7 @@ public class SslTransportLayerTest {
 
     @Test
     public void testClosePlaintext() throws Exception {
-        testClose(SecurityProtocol.PLAINTEXT, new PlaintextChannelBuilder());
+        testClose(SecurityProtocol.PLAINTEXT, new PlaintextChannelBuilder(null));
     }
 
     private void testClose(SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder)
throws Exception {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index fdf3687..8b2e8b2 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -53,19 +53,21 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport
{
         DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
                 transportLayer, oldPrincipalBuilder, null);
 
-        KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost()));
+        KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(
+                InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
         assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
         assertEquals("foo", principal.getName());
 
         builder.close();
-
         verifyAll();
     }
 
     @Test
     public void testReturnAnonymousPrincipalForPlaintext() throws Exception {
         DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);
-        assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost())));
+        assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(
+                new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())));
+        builder.close();
     }
 
     @Test
@@ -86,12 +88,12 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport
{
         DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
                 transportLayer, oldPrincipalBuilder, null);
 
-        KafkaPrincipal principal = builder.build(new SslAuthenticationContext(session, InetAddress.getLocalHost()));
+        KafkaPrincipal principal = builder.build(
+                new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
         assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
         assertEquals("foo", principal.getName());
 
         builder.close();
-
         verifyAll();
     }
 
@@ -105,10 +107,12 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport
{
 
         DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);
 
-        KafkaPrincipal principal = builder.build(new SslAuthenticationContext(session, InetAddress.getLocalHost()));
+        KafkaPrincipal principal = builder.build(
+                new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
         assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
         assertEquals("foo", principal.getName());
 
+        builder.close();
         verifyAll();
     }
 
@@ -124,10 +128,11 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport
{
         DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);
 
         KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server,
-                SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost()));
+                SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name()));
         assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
         assertEquals("foo", principal.getName());
 
+        builder.close();
         verifyAll();
     }
 
@@ -146,10 +151,11 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport
{
         DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer);
 
         KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server,
-                SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost()));
+                SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name()));
         assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
         assertEquals("foo", principal.getName());
 
+        builder.close();
         verifyAll();
     }
 
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index a2e5fd9..4189ce3 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -358,7 +358,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness
with Sas
     }
   }
 
-  private def sendRecords(numRecords: Int, tp: TopicPartition) {
+  protected final def sendRecords(numRecords: Int, tp: TopicPartition) {
     val futures = (0 until numRecords).map { i =>
       val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
       debug(s"Sending this record: $record")
@@ -371,7 +371,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness
with Sas
     }
   }
 
-  protected def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
+  protected final def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
                              numRecords: Int = 1,
                              startingOffset: Int = 0,
                              topic: String = topic,
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
index 6279340..0b2fbca 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
@@ -19,13 +19,20 @@ package kafka.api
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth._
-import org.junit.Before
+import org.junit.{Before, Test}
+import org.junit.Assert._
+import org.apache.kafka.common.errors.TopicAuthorizationException
 
 // This test case uses a separate listener for client and inter-broker communication, from
 // which we derive corresponding principals
 object PlaintextEndToEndAuthorizationTest {
+  @volatile
+  private var clientListenerName = None: Option[String]
+  @volatile
+  private var serverListenerName = None: Option[String]
   class TestClientPrincipalBuilder extends KafkaPrincipalBuilder {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
+      clientListenerName = Some(context.listenerName)
       context match {
         case ctx: PlaintextAuthenticationContext if ctx.clientAddress != null =>
           new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
@@ -37,6 +44,7 @@ object PlaintextEndToEndAuthorizationTest {
 
   class TestServerPrincipalBuilder extends KafkaPrincipalBuilder {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
+      serverListenerName = Some(context.listenerName)
       context match {
         case ctx: PlaintextAuthenticationContext =>
           new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server")
@@ -67,4 +75,13 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest
{
     super.setUp()
   }
 
+  @Test
+  def testListenerName() {
+    // To check the client listener name, establish a session on the server by sending any
request eg sendRecords
+    intercept[TopicAuthorizationException](sendRecords(1, tp))
+
+    assertEquals(Some("CLIENT"), PlaintextEndToEndAuthorizationTest.clientListenerName)
+    assertEquals(Some("SERVER"), PlaintextEndToEndAuthorizationTest.serverListenerName)
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.

Mime
View raw message