kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7182: SASL/OAUTHBEARER client response missing %x01 seps (#5391)
Date Thu, 19 Jul 2018 16:53:52 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new e4d2f6c  KAFKA-7182: SASL/OAUTHBEARER client response missing %x01 seps (#5391)
e4d2f6c is described below

commit e4d2f6c32efc865c76ea59c879273fc3ae1fdde2
Author: Ron Dagostino <rndgstn@gmail.com>
AuthorDate: Thu Jul 19 12:53:47 2018 -0400

    KAFKA-7182: SASL/OAUTHBEARER client response missing %x01 seps (#5391)
    
    The SASL/OAUTHBEARER client response as currently implemented in OAuthBearerSaslClient
sends the valid gs2-header "n,," but then sends the "auth" key and value immediately after
it.
    This does not conform to the specification because there is no %x01 after the gs2-header,
no %x01 after the auth value, and no terminating %x01. Fixed this and the parsing of the client
response in
    OAuthBearerSaslServer, which currently allows the malformed text. Also updated to accept
and ignore unknown properties as required by the spec.
    
    Reviewers: Stanislav Kozlovski <familyguyuser192@windowslive.com>, Rajini Sivaram
<rajinisivaram@googlemail.com>
---
 .../OAuthBearerClientInitialResponse.java          | 96 ++++++++++++++++++++++
 .../internals/OAuthBearerSaslClient.java           |  3 +-
 .../internals/OAuthBearerSaslServer.java           | 30 ++-----
 .../security/scram/internals/ScramExtensions.java  | 29 +------
 .../java/org/apache/kafka/common/utils/Utils.java  | 13 +++
 .../OAuthBearerClientInitialResponseTest.java      | 65 +++++++++++++++
 .../internals/OAuthBearerSaslServerTest.java       | 22 ++---
 7 files changed, 197 insertions(+), 61 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
new file mode 100644
index 0000000..8d4b18a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internals;
+
+import org.apache.kafka.common.utils.Utils;
+
+import javax.security.sasl.SaslException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class OAuthBearerClientInitialResponse {
+    static final String SEPARATOR = "\u0001";
+
+    private static final String SASLNAME = "(?:[\\x01-\\x7F&&[^=,]]|=2C|=3D)+";
+    private static final String KEY = "[A-Za-z]+";
+    private static final String VALUE = "[\\x21-\\x7E \t\r\n]+";
+    private static final String KVPAIRS = String.format("(%s=%s%s)*", KEY, VALUE, SEPARATOR);
+    private static final Pattern AUTH_PATTERN = Pattern.compile("(?<scheme>[\\w]+)[
]+(?<token>[-_\\.a-zA-Z0-9]+)");
+    private static final Pattern CLIENT_INITIAL_RESPONSE_PATTERN = Pattern.compile(
+            String.format("n,(a=(?<authzid>%s))?,%s(?<kvpairs>%s)%s", SASLNAME,
SEPARATOR, KVPAIRS, SEPARATOR));
+    private static final String AUTH_KEY = "auth";
+
+    private final String tokenValue;
+    private final String authorizationId;
+    private final Map<String, String> properties;
+
+    public OAuthBearerClientInitialResponse(byte[] response) throws SaslException {
+        String responseMsg = new String(response, StandardCharsets.UTF_8);
+        Matcher matcher = CLIENT_INITIAL_RESPONSE_PATTERN.matcher(responseMsg);
+        if (!matcher.matches())
+            throw new SaslException("Invalid OAUTHBEARER client first message");
+        String authzid = matcher.group("authzid");
+        this.authorizationId = authzid == null ? "" : authzid;
+        String kvPairs = matcher.group("kvpairs");
+        this.properties = Utils.parseMap(kvPairs, "=", SEPARATOR);
+        String auth = properties.get(AUTH_KEY);
+        if (auth == null)
+            throw new SaslException("Invalid OAUTHBEARER client first message: 'auth' not
specified");
+
+        Matcher authMatcher = AUTH_PATTERN.matcher(auth);
+        if (!authMatcher.matches())
+            throw new SaslException("Invalid OAUTHBEARER client first message: invalid 'auth'
format");
+        if (!"bearer".equalsIgnoreCase(authMatcher.group("scheme"))) {
+            String msg = String.format("Invalid scheme in OAUTHBEARER client first message:
%s",
+                    matcher.group("scheme"));
+            throw new SaslException(msg);
+        }
+        this.tokenValue = authMatcher.group("token");
+    }
+
+    public OAuthBearerClientInitialResponse(String tokenValue) {
+        this(tokenValue, "", new HashMap<>());
+    }
+
+    public OAuthBearerClientInitialResponse(String tokenValue, String authorizationId, Map<String,
String> props) {
+        this.tokenValue = tokenValue;
+        this.authorizationId = authorizationId == null ? "" : authorizationId;
+        this.properties = new HashMap<>(props);
+    }
+
+    public byte[] toBytes() {
+        String authzid = authorizationId.isEmpty() ? "" : "a=" + authorizationId;
+        String message = String.format("n,%s,%sauth=Bearer %s%s%s", authzid,
+                SEPARATOR, tokenValue, SEPARATOR, SEPARATOR);
+        return message.getBytes(StandardCharsets.UTF_8);
+    }
+
+    public String tokenValue() {
+        return tokenValue;
+    }
+
+    public String authorizationId() {
+        return authorizationId;
+    }
+
+    public String propertyValue(String name) {
+        return properties.get(name);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
index 66942ba..4d4ee57 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
@@ -88,8 +88,7 @@ public class OAuthBearerSaslClient implements SaslClient {
                         throw new SaslException("Expected empty challenge");
                     callbackHandler().handle(new Callback[] {callback});
                     setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
-                    return String.format("n,,auth=Bearer %s", callback.token().value())
-                            .getBytes(StandardCharsets.UTF_8);
+                    return new OAuthBearerClientInitialResponse(callback.token().value()).toBytes();
                 case RECEIVE_SERVER_FIRST_MESSAGE:
                     if (challenge != null && challenge.length != 0) {
                         String jsonErrorResponse = new String(challenge, StandardCharsets.UTF_8);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
index 5d1f224..aacc8fa 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
@@ -21,8 +21,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -48,13 +46,9 @@ import org.slf4j.LoggerFactory;
  * for example).
  */
 public class OAuthBearerSaslServer implements SaslServer {
-    private static final String INVALID_OAUTHBEARER_CLIENT_FIRST_MESSAGE = "Invalid OAUTHBEARER
client first message";
     private static final Logger log = LoggerFactory.getLogger(OAuthBearerSaslServer.class);
     private static final String NEGOTIATED_PROPERTY_KEY_TOKEN = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM
+ ".token";
     private static final String INTERNAL_ERROR_ON_SERVER = "Authentication could not be performed
due to an internal error on the server";
-    private static final String SASLNAME = "(?:[\\x01-\\x7F&&[^=,]]|=2C|=3D)+";
-    private static final Pattern CLIENT_INITIAL_RESPONSE_PATTERN = Pattern.compile(
-            String.format("n,(a=(?<authzid>%s))?,auth=(?<scheme>[\\w]+)[ ]+(?<token>[-_\\.a-zA-Z0-9]+)",
SASLNAME));
 
     private final AuthenticateCallbackHandler callbackHandler;
 
@@ -90,24 +84,14 @@ public class OAuthBearerSaslServer implements SaslServer {
             throw new SaslAuthenticationException(errorMessage);
         }
         errorMessage = null;
-        String responseMsg = new String(response, StandardCharsets.UTF_8);
-        Matcher matcher = CLIENT_INITIAL_RESPONSE_PATTERN.matcher(responseMsg);
-        if (!matcher.matches()) {
-            if (log.isDebugEnabled())
-                log.debug(INVALID_OAUTHBEARER_CLIENT_FIRST_MESSAGE);
-            throw new SaslException(INVALID_OAUTHBEARER_CLIENT_FIRST_MESSAGE);
-        }
-        String authzid = matcher.group("authzid");
-        String authorizationId = authzid != null ? authzid : "";
-        if (!"bearer".equalsIgnoreCase(matcher.group("scheme"))) {
-            String msg = String.format("Invalid scheme in OAUTHBEARER client first message:
%s",
-                    matcher.group("scheme"));
-            if (log.isDebugEnabled())
-                log.debug(msg);
-            throw new SaslException(msg);
+        OAuthBearerClientInitialResponse clientResponse;
+        try {
+            clientResponse = new OAuthBearerClientInitialResponse(response);
+        } catch (SaslException e) {
+            log.debug(e.getMessage());
+            throw e;
         }
-        String tokenValue = matcher.group("token");
-        return process(tokenValue, authorizationId);
+        return process(clientResponse.tokenValue(), clientResponse.authorizationId());
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
index cbfca13..5028329 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
@@ -17,9 +17,9 @@
 package org.apache.kafka.common.security.scram.internals;
 
 import org.apache.kafka.common.security.scram.ScramLoginModule;
+import org.apache.kafka.common.utils.Utils;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -31,7 +31,7 @@ public class ScramExtensions {
     }
 
     public ScramExtensions(String extensions) {
-        this(stringToMap(extensions));
+        this(Utils.parseMap(extensions, "=", ","));
     }
 
     public ScramExtensions(Map<String, String> extensionMap) {
@@ -52,29 +52,6 @@ public class ScramExtensions {
 
     @Override
     public String toString() {
-        return mapToString(extensionMap);
-    }
-
-    private static Map<String, String> stringToMap(String extensions) {
-        Map<String, String> extensionMap = new HashMap<>();
-
-        if (!extensions.isEmpty()) {
-            String[] attrvals = extensions.split(",");
-            for (String attrval : attrvals) {
-                String[] array = attrval.split("=", 2);
-                extensionMap.put(array[0], array[1]);
-            }
-        }
-        return extensionMap;
-    }
-
-    private static String mapToString(Map<String, String> extensionMap) {
-        StringBuilder builder = new StringBuilder();
-        for (Map.Entry<String, String> entry : extensionMap.entrySet()) {
-            builder.append(entry.getKey());
-            builder.append('=');
-            builder.append(entry.getValue());
-        }
-        return builder.toString();
+        return Utils.mkString(extensionMap, "", "", "=", ",");
     }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 31fa01c..330f968 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -512,6 +512,19 @@ public final class Utils {
         return bld.toString();
     }
 
+    public static Map<String, String> parseMap(String mapStr, String keyValueSeparator,
String elementSeparator) {
+        Map<String, String> map = new HashMap<>();
+
+        if (!mapStr.isEmpty()) {
+            String[] attrvals = mapStr.split(elementSeparator);
+            for (String attrval : attrvals) {
+                String[] array = attrval.split(keyValueSeparator, 2);
+                map.put(array[0], array[1]);
+            }
+        }
+        return map;
+    }
+
     /**
      * Read a properties file from the given path
      * @param filename The path of the file to read
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
new file mode 100644
index 0000000..eccf2dd
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internals;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+public class OAuthBearerClientInitialResponseTest {
+
+    @Test
+    public void testToken() throws Exception {
+        String message = "n,,\u0001auth=Bearer 123.345.567\u0001\u0001";
+        OAuthBearerClientInitialResponse response = new OAuthBearerClientInitialResponse(message.getBytes(StandardCharsets.UTF_8));
+        assertEquals("123.345.567", response.tokenValue());
+        assertEquals("", response.authorizationId());
+    }
+
+    @Test
+    public void testAuthorizationId() throws Exception {
+        String message = "n,a=myuser,\u0001auth=Bearer 345\u0001\u0001";
+        OAuthBearerClientInitialResponse response = new OAuthBearerClientInitialResponse(message.getBytes(StandardCharsets.UTF_8));
+        assertEquals("345", response.tokenValue());
+        assertEquals("myuser", response.authorizationId());
+    }
+
+    @Test
+    public void testProperties() throws Exception {
+        String message = "n,,\u0001propA=valueA1, valueA2\u0001auth=Bearer 567\u0001propB=valueB\u0001\u0001";
+        OAuthBearerClientInitialResponse response = new OAuthBearerClientInitialResponse(message.getBytes(StandardCharsets.UTF_8));
+        assertEquals("567", response.tokenValue());
+        assertEquals("", response.authorizationId());
+        assertEquals("valueA1, valueA2", response.propertyValue("propA"));
+        assertEquals("valueB", response.propertyValue("propB"));
+    }
+
+    // The example in the RFC uses `vF9dft4qmTc2Nvb3RlckBhbHRhdmlzdGEuY29tCg==` as the token
+    // But since we use Base64Url encoding, padding is omitted. Hence this test verifies
without '='.
+    @Test
+    public void testRfc7688Example() throws Exception {
+        String message = "n,a=user@example.com,\u0001host=server.example.com\u0001port=143\u0001"
+
+                "auth=Bearer vF9dft4qmTc2Nvb3RlckBhbHRhdmlzdGEuY29tCg\u0001\u0001";
+        OAuthBearerClientInitialResponse response = new OAuthBearerClientInitialResponse(message.getBytes(StandardCharsets.UTF_8));
+        assertEquals("vF9dft4qmTc2Nvb3RlckBhbHRhdmlzdGEuY29tCg", response.tokenValue());
+        assertEquals("user@example.com", response.authorizationId());
+        assertEquals("server.example.com", response.propertyValue("host"));
+        assertEquals("143", response.propertyValue("port"));
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
index bf21f2b..6b53e96 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
@@ -75,39 +75,41 @@ public class OAuthBearerSaslServerTest {
     @Test
     public void noAuthorizationIdSpecified() throws Exception {
         byte[] nextChallenge = saslServer
-                .evaluateResponse(clientInitialResponseText(null).getBytes(StandardCharsets.UTF_8));
+                .evaluateResponse(clientInitialResponse(null));
         assertTrue("Next challenge is not empty", nextChallenge.length == 0);
     }
 
     @Test
     public void authorizatonIdEqualsAuthenticationId() throws Exception {
         byte[] nextChallenge = saslServer
-                .evaluateResponse(clientInitialResponseText(USER).getBytes(StandardCharsets.UTF_8));
+                .evaluateResponse(clientInitialResponse(USER));
         assertTrue("Next challenge is not empty", nextChallenge.length == 0);
     }
 
     @Test(expected = SaslAuthenticationException.class)
     public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
-        saslServer.evaluateResponse(clientInitialResponseText(USER + "x").getBytes(StandardCharsets.UTF_8));
+        saslServer.evaluateResponse(clientInitialResponse(USER + "x"));
     }
 
     @Test
     public void illegalToken() throws Exception {
-        byte[] bytes = saslServer
-                .evaluateResponse((clientInitialResponseText(null) + "AB").getBytes(StandardCharsets.UTF_8));
+        byte[] bytes = saslServer.evaluateResponse(clientInitialResponse(null, true));
         String challenge = new String(bytes, StandardCharsets.UTF_8);
         assertEquals("{\"status\":\"invalid_token\"}", challenge);
     }
 
-    private String clientInitialResponseText(String authorizationId)
+    private byte[] clientInitialResponse(String authorizationId)
+            throws OAuthBearerConfigException, IOException, UnsupportedCallbackException,
LoginException {
+        return clientInitialResponse(authorizationId, false);
+    }
+
+    private byte[] clientInitialResponse(String authorizationId, boolean illegalToken)
             throws OAuthBearerConfigException, IOException, UnsupportedCallbackException,
LoginException {
         OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
         LOGIN_CALLBACK_HANDLER.handle(new Callback[] {callback});
         OAuthBearerToken token = callback.token();
         String compactSerialization = token.value();
-        String clientInitialResponseText = "n,"
-                + (authorizationId == null || authorizationId.isEmpty() ? "" : "a=" + authorizationId)
+ ",auth=Bearer "
-                + compactSerialization;
-        return clientInitialResponseText;
+        String tokenValue = compactSerialization + (illegalToken ? "AB" : "");
+        return new OAuthBearerClientInitialResponse(tokenValue, authorizationId, Collections.emptyMap()).toBytes();
     }
 }


Mime
View raw message