kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Safer handling of requests prior to SASL authentication
Date Mon, 14 Aug 2017 13:58:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 f68d67201 -> fb47e213e


MINOR: Safer handling of requests prior to SASL authentication

This implements two improvements for request handling prior to SASL authentication:

1. Only parse request types that are allowed prior to authentication.
2. Limit the maximum request size (the default is 100Mb).

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3558 from hachikuji/minor-restrict-presasl-request-parsing

(cherry picked from commit 8265a43897b3da88839df061ce22559820087ad9)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb47e213
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb47e213
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb47e213

Branch: refs/heads/0.11.0
Commit: fb47e213ead7c23abe57c258bdf249682d47f8ae
Parents: f68d672
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Aug 14 14:05:40 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon Aug 14 14:58:15 2017 +0100

----------------------------------------------------------------------
 .../common/network/SaslChannelBuilder.java      |   2 +-
 .../authenticator/SaslServerAuthenticator.java  |  22 ++--
 .../SaslServerAuthenticatorTest.java            | 119 +++++++++++++++++++
 3 files changed, 132 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fb47e213/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 33a9f4d..e124afd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -107,7 +107,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
             Authenticator authenticator;
             if (mode == Mode.SERVER)
                 authenticator = new SaslServerAuthenticator(id, jaasContext, loginManager.subject(),
-                        kerberosShortNamer, socketChannel.socket().getLocalAddress().getHostName(),
maxReceiveSize,
+                        kerberosShortNamer, socketChannel.socket().getLocalAddress().getHostName(),
                         credentialCache);
             else
                 authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb47e213/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 62b4039..ca2d3eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.Authenticator;
-import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.NetworkSend;
@@ -33,12 +32,13 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.SchemaException;
-import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
+import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
@@ -73,6 +73,8 @@ import java.util.Set;
 
 public class SaslServerAuthenticator implements Authenticator {
 
+    // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
+    static final int MAX_RECEIVE_SIZE = 524288;
     private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
 
     public enum SaslState {
@@ -83,7 +85,6 @@ public class SaslServerAuthenticator implements Authenticator {
     private final JaasContext jaasContext;
     private final Subject subject;
     private final KerberosShortNamer kerberosNamer;
-    private final int maxReceiveSize;
     private final String host;
     private final CredentialCache credentialCache;
 
@@ -104,14 +105,15 @@ public class SaslServerAuthenticator implements Authenticator {
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
 
-    public SaslServerAuthenticator(String node, JaasContext jaasContext, final Subject subject,
KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize, CredentialCache credentialCache)
throws IOException {
+    public SaslServerAuthenticator(String node, JaasContext jaasContext, final Subject subject,
+                                   KerberosShortNamer kerberosNameParser, String host,
+                                   CredentialCache credentialCache) throws IOException {
         if (subject == null)
             throw new IllegalArgumentException("subject cannot be null");
         this.node = node;
         this.jaasContext = jaasContext;
         this.subject = subject;
         this.kerberosNamer = kerberosNameParser;
-        this.maxReceiveSize = maxReceiveSize;
         this.host = host;
         this.credentialCache = credentialCache;
     }
@@ -209,7 +211,7 @@ public class SaslServerAuthenticator implements Authenticator {
             return;
         }
 
-        if (netInBuffer == null) netInBuffer = new NetworkReceive(maxReceiveSize, node);
+        if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, node);
 
         netInBuffer.readFrom(transportLayer);
 
@@ -309,16 +311,16 @@ public class SaslServerAuthenticator implements Authenticator {
                 else
                     throw new UnsupportedVersionException("Version " + requestHeader.apiVersion()
+ " is not supported for apiKey " + apiKey);
             } else {
-                AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(),
requestHeader.apiVersion(),
-                        requestBuffer).request;
-
                 LOG.debug("Handle Kafka request {}", apiKey);
                 switch (apiKey) {
                     case API_VERSIONS:
                         handleApiVersionsRequest(requestHeader);
                         break;
                     case SASL_HANDSHAKE:
-                        clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest)
request);
+                        short version = requestHeader.apiVersion();
+                        Struct struct = ApiKeys.SASL_HANDSHAKE.parseRequest(version, requestBuffer);
+                        SaslHandshakeRequest saslHandshakeRequest = new SaslHandshakeRequest(struct,
version);
+                        clientMechanism = handleHandshakeRequest(requestHeader, saslHandshakeRequest);
                         break;
                     default:
                         throw new IllegalSaslStateException("Unexpected Kafka request of
type " + apiKey + " during SASL handshake.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb47e213/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
new file mode 100644
index 0000000..b76f3cc
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.network.InvalidReceiveException;
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import javax.security.auth.Subject;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.common.security.scram.ScramMechanism.SCRAM_SHA_256;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SaslServerAuthenticatorTest {
+
+    @Test(expected = InvalidReceiveException.class)
+    public void testOversizeRequest() throws IOException {
+        SaslServerAuthenticator authenticator = setupAuthenticator();
+        TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
+        PrincipalBuilder principalBuilder = null; // SASL authenticator does not currently
use principal builder
+        Map<String, ?> configs = Collections.singletonMap(SaslConfigs.SASL_ENABLED_MECHANISMS,
+                Collections.singletonList(SCRAM_SHA_256.mechanismName()));
+
+        final Capture<ByteBuffer> size = EasyMock.newCapture();
+        EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>()
{
+            @Override
+            public Integer answer() throws Throwable {
+                size.getValue().putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1);
+                return 4;
+            }
+        });
+
+        EasyMock.replay(transportLayer);
+
+        authenticator.configure(transportLayer, principalBuilder, configs);
+        authenticator.authenticate();
+    }
+
+    @Test
+    public void testUnexpectedRequestType() throws IOException {
+        SaslServerAuthenticator authenticator = setupAuthenticator();
+        TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
+        PrincipalBuilder principalBuilder = null; // SASL authenticator does not currently
use principal builder
+        Map<String, ?> configs = Collections.singletonMap(SaslConfigs.SASL_ENABLED_MECHANISMS,
+                Collections.singletonList(SCRAM_SHA_256.mechanismName()));
+
+        final RequestHeader header = new RequestHeader(ApiKeys.METADATA.id, (short) 0, "clientId",
13243);
+        final Struct headerStruct = header.toStruct();
+
+        final Capture<ByteBuffer> size = EasyMock.newCapture();
+        EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>()
{
+            @Override
+            public Integer answer() throws Throwable {
+                size.getValue().putInt(headerStruct.sizeOf());
+                return 4;
+            }
+        });
+
+        final Capture<ByteBuffer> payload = EasyMock.newCapture();
+        EasyMock.expect(transportLayer.read(EasyMock.capture(payload))).andAnswer(new IAnswer<Integer>()
{
+            @Override
+            public Integer answer() throws Throwable {
+                // serialize only the request header. the authenticator should not parse
beyond this
+                headerStruct.writeTo(payload.getValue());
+                return headerStruct.sizeOf();
+            }
+        });
+
+        EasyMock.replay(transportLayer);
+
+        authenticator.configure(transportLayer, principalBuilder, configs);
+        try {
+            authenticator.authenticate();
+            fail("Expected authenticate() to raise an exception");
+        } catch (IOException e) {
+            assertTrue(e.getCause() instanceof IllegalSaslStateException);
+        }
+    }
+
+    private SaslServerAuthenticator setupAuthenticator() throws IOException {
+        TestJaasConfig jaasConfig = new TestJaasConfig();
+        jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String,
Object>());
+        JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER,
jaasConfig);
+        Subject subject = new Subject();
+        return new SaslServerAuthenticator("node", jaasContext, subject, null, "localhost",
new CredentialCache());
+    }
+
+}


Mime
View raw message