kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7997: Use automatic RPC generation in SaslAuthenticate
Date Sat, 02 Mar 2019 15:58:53 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0d56f14  KAFKA-7997: Use automatic RPC generation in SaslAuthenticate
0d56f14 is described below

commit 0d56f1413557adabc736cae2dffcdc56a620403e
Author: Mickael Maison <mickael.maison@gmail.com>
AuthorDate: Sat Mar 2 21:28:30 2019 +0530

    KAFKA-7997: Use automatic RPC generation in SaslAuthenticate
    
    Author: Mickael Maison <mickael.maison@gmail.com>
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Closes #6324 from mimaison/sasl-authenticate
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  8 +-
 .../kafka/common/requests/AbstractResponse.java    |  2 +-
 .../common/requests/SaslAuthenticateRequest.java   | 63 ++++++----------
 .../common/requests/SaslAuthenticateResponse.java  | 86 +++++-----------------
 .../authenticator/SaslClientAuthenticator.java     |  7 +-
 .../authenticator/SaslServerAuthenticator.java     | 24 ++++--
 .../java/org/apache/kafka/common/utils/Utils.java  |  9 +++
 .../kafka/common/requests/RequestResponseTest.java | 11 ++-
 .../authenticator/SaslAuthenticatorTest.java       |  4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  9 ++-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  3 +-
 11 files changed, 98 insertions(+), 128 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 19bf6f0..0a19939 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
+import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.protocol.types.Schema;
@@ -97,8 +99,6 @@ import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
 import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
-import org.apache.kafka.common.requests.SaslAuthenticateRequest;
-import org.apache.kafka.common.requests.SaslAuthenticateResponse;
 import org.apache.kafka.common.requests.StopReplicaRequest;
 import org.apache.kafka.common.requests.StopReplicaResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
@@ -180,8 +180,8 @@ public enum ApiKeys {
             AlterReplicaLogDirsResponse.schemaVersions()),
     DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequest.schemaVersions(),
             DescribeLogDirsResponse.schemaVersions()),
-    SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequest.schemaVersions(),
-            SaslAuthenticateResponse.schemaVersions()),
+    SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequestData.SCHEMAS,
+            SaslAuthenticateResponseData.SCHEMAS),
     CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(),
             CreatePartitionsResponse.schemaVersions()),
     CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(),
CreateDelegationTokenResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 959379c..712d732 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -143,7 +143,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse
{
             case DESCRIBE_LOG_DIRS:
                 return new DescribeLogDirsResponse(struct);
             case SASL_AUTHENTICATE:
-                return new SaslAuthenticateResponse(struct);
+                return new SaslAuthenticateResponse(struct, version);
             case CREATE_PARTITIONS:
                 return new CreatePartitionsResponse(struct);
             case CREATE_DELEGATION_TOKEN:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
index b6da7ea..d136e78 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
@@ -16,15 +16,13 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
+import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
 
 /**
  * Request from SASL client containing client SASL authentication token as defined by the
@@ -35,31 +33,18 @@ import static org.apache.kafka.common.protocol.types.Type.BYTES;
  * brokers will send SaslHandshake request v0 followed by SASL tokens without the Kafka request
headers.
  */
 public class SaslAuthenticateRequest extends AbstractRequest {
-    private static final String SASL_AUTH_BYTES_KEY_NAME = "sasl_auth_bytes";
-
-    private static final Schema SASL_AUTHENTICATE_REQUEST_V0 = new Schema(
-            new Field(SASL_AUTH_BYTES_KEY_NAME, BYTES, "SASL authentication bytes from client
as defined by the SASL mechanism."));
-
-    /* v1 request is the same as v0; session_lifetime_ms has been added to the response */
-    private static final Schema SASL_AUTHENTICATE_REQUEST_V1 = SASL_AUTHENTICATE_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{SASL_AUTHENTICATE_REQUEST_V0, SASL_AUTHENTICATE_REQUEST_V1};
-    }
-
-    private final ByteBuffer saslAuthBytes;
 
     public static class Builder extends AbstractRequest.Builder<SaslAuthenticateRequest>
{
-        private final ByteBuffer saslAuthBytes;
+        private final SaslAuthenticateRequestData data;
 
-        public Builder(ByteBuffer saslAuthBytes) {
+        public Builder(SaslAuthenticateRequestData data) {
             super(ApiKeys.SASL_AUTHENTICATE);
-            this.saslAuthBytes = saslAuthBytes;
+            this.data = data;
         }
 
         @Override
         public SaslAuthenticateRequest build(short version) {
-            return new SaslAuthenticateRequest(saslAuthBytes, version);
+            return new SaslAuthenticateRequest(data, version);
         }
 
         @Override
@@ -70,35 +55,35 @@ public class SaslAuthenticateRequest extends AbstractRequest {
         }
     }
 
-    public SaslAuthenticateRequest(ByteBuffer saslAuthBytes) {
-        this(saslAuthBytes, ApiKeys.SASL_AUTHENTICATE.latestVersion());
+    private final SaslAuthenticateRequestData data;
+    private final short version;
+
+    public SaslAuthenticateRequest(SaslAuthenticateRequestData data) {
+        this(data, ApiKeys.SASL_AUTHENTICATE.latestVersion());
     }
 
-    public SaslAuthenticateRequest(ByteBuffer saslAuthBytes, short version) {
+    public SaslAuthenticateRequest(SaslAuthenticateRequestData data, short version) {
         super(ApiKeys.SASL_AUTHENTICATE, version);
-        this.saslAuthBytes = saslAuthBytes;
+        this.data = data;
+        this.version = version;
     }
 
     public SaslAuthenticateRequest(Struct struct, short version) {
         super(ApiKeys.SASL_AUTHENTICATE, version);
-        saslAuthBytes = struct.getBytes(SASL_AUTH_BYTES_KEY_NAME);
+        this.data = new SaslAuthenticateRequestData(struct, version);
+        this.version = version;
     }
 
-    public ByteBuffer saslAuthBytes() {
-        return saslAuthBytes;
+    public SaslAuthenticateRequestData data() {
+        return data;
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        short versionId = version();
-        switch (versionId) {
-            case 0:
-            case 1:
-                return new SaslAuthenticateResponse(Errors.forException(e), e.getMessage());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid.
Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ApiKeys.SASL_AUTHENTICATE.latestVersion()));
-        }
+        SaslAuthenticateResponseData response = new SaslAuthenticateResponseData()
+                .setErrorCode(ApiError.fromThrowable(e).error().code())
+                .setErrorMessage(e.getMessage());
+        return new SaslAuthenticateResponse(response);
     }
 
     public static SaslAuthenticateRequest parse(ByteBuffer buffer, short version) {
@@ -107,9 +92,7 @@ public class SaslAuthenticateRequest extends AbstractRequest {
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.SASL_AUTHENTICATE.requestSchema(version()));
-        struct.set(SASL_AUTH_BYTES_KEY_NAME, saslAuthBytes);
-        return struct;
+        return data.toStruct(version);
     }
 }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index 402d04d..fc8832a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -16,111 +16,63 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-
 
 /**
  * Response from SASL server which for a SASL challenge as defined by the SASL protocol
  * for the mechanism configured for the client.
  */
 public class SaslAuthenticateResponse extends AbstractResponse {
-    private static final String SASL_AUTH_BYTES_KEY_NAME = "sasl_auth_bytes";
-    private static final String SESSION_LIFETIME_MS = "session_lifetime_ms";
-
-    private static final Schema SASL_AUTHENTICATE_RESPONSE_V0 = new Schema(
-            ERROR_CODE,
-            ERROR_MESSAGE,
-            new Field(SASL_AUTH_BYTES_KEY_NAME, BYTES, "SASL authentication bytes from server
as defined by the SASL mechanism."));
 
-    private static final Schema SASL_AUTHENTICATE_RESPONSE_V1 = new Schema(
-            ERROR_CODE,
-            ERROR_MESSAGE,
-            new Field(SASL_AUTH_BYTES_KEY_NAME, BYTES, "SASL authentication bytes from server
as defined by the SASL mechanism."),
-            new Field(SESSION_LIFETIME_MS, INT64, "Number of milliseconds after which only
re-authentication over the existing connection to create a new session can occur."));
+    private final SaslAuthenticateResponseData data;
 
-    public static Schema[] schemaVersions() {
-        return new Schema[]{SASL_AUTHENTICATE_RESPONSE_V0, SASL_AUTHENTICATE_RESPONSE_V1};
+    public SaslAuthenticateResponse(SaslAuthenticateResponseData data) {
+        this.data = data;
     }
 
-    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
-
-    private final ByteBuffer saslAuthBytes;
+    public SaslAuthenticateResponse(Struct struct, short version) {
+        this.data = new SaslAuthenticateResponseData(struct, version);
+    }
 
     /**
      * Possible error codes:
      *   SASL_AUTHENTICATION_FAILED(57) : Authentication failed
      */
-    private final Errors error;
-    private final String errorMessage;
-    private final long sessionLifetimeMs;
-
-    public SaslAuthenticateResponse(Errors error, String errorMessage) {
-        this(error, errorMessage, EMPTY_BUFFER);
-    }
-
-    public SaslAuthenticateResponse(Errors error, String errorMessage, ByteBuffer saslAuthBytes)
{
-        this(error, errorMessage, saslAuthBytes, 0L);
-    }
-
-    public SaslAuthenticateResponse(Errors error, String errorMessage, ByteBuffer saslAuthBytes,
long sessionLifetimeMs) {
-        this.error = error;
-        this.errorMessage = errorMessage;
-        this.saslAuthBytes = saslAuthBytes;
-        this.sessionLifetimeMs = sessionLifetimeMs;
-    }
-
-    public SaslAuthenticateResponse(Struct struct) {
-        error = Errors.forCode(struct.get(ERROR_CODE));
-        errorMessage = struct.get(ERROR_MESSAGE);
-        saslAuthBytes = struct.getBytes(SASL_AUTH_BYTES_KEY_NAME);
-        sessionLifetimeMs = struct.hasField(SESSION_LIFETIME_MS) ? struct.getLong(SESSION_LIFETIME_MS).longValue()
: 0L;
-    }
-
     public Errors error() {
-        return error;
+        return Errors.forCode(data.errorCode());
     }
 
-    public String errorMessage() {
-        return errorMessage;
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
     }
 
-    public ByteBuffer saslAuthBytes() {
-        return saslAuthBytes;
+    public String errorMessage() {
+        return data.errorMessage();
     }
 
     public long sessionLifetimeMs() {
-        return sessionLifetimeMs;
+        return data.sessionLifetimeMs();
     }
 
-    @Override
-    public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
+    public byte[] saslAuthBytes() {
+        return data.authBytes();
     }
 
     @Override
     public Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.SASL_AUTHENTICATE.responseSchema(version));
-        struct.set(ERROR_CODE, error.code());
-        struct.set(ERROR_MESSAGE, errorMessage);
-        struct.set(SASL_AUTH_BYTES_KEY_NAME, saslAuthBytes);
-        if (version > 0)
-            struct.set(SESSION_LIFETIME_MS, sessionLifetimeMs);
-        return struct;
+        return data.toStruct(version);
     }
 
     public static SaslAuthenticateResponse parse(ByteBuffer buffer, short version) {
-        return new SaslAuthenticateResponse(ApiKeys.SASL_AUTHENTICATE.parseResponse(version,
buffer));
+        return new SaslAuthenticateResponse(ApiKeys.SASL_AUTHENTICATE.parseResponse(version,
buffer), version);
     }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index cc26336..3133f44 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.NetworkSend;
@@ -373,7 +374,9 @@ public class SaslClientAuthenticator implements Authenticator {
             if (saslToken != null) {
                 ByteBuffer tokenBuf = ByteBuffer.wrap(saslToken);
                 if (saslAuthenticateVersion != DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER) {
-                    SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(tokenBuf).build(saslAuthenticateVersion);
+                    SaslAuthenticateRequestData data = new SaslAuthenticateRequestData()
+                            .setAuthBytes(tokenBuf.array());
+                    SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(data).build(saslAuthenticateVersion);
                     tokenBuf = request.serialize(nextRequestHeader(ApiKeys.SASL_AUTHENTICATE,
saslAuthenticateVersion));
                 }
                 send(new NetworkSend(node, tokenBuf));
@@ -445,7 +448,7 @@ public class SaslClientAuthenticator implements Authenticator {
                 long sessionLifetimeMs = response.sessionLifetimeMs();
                 if (sessionLifetimeMs > 0L)
                     reauthInfo.positiveSessionLifetimeMs = sessionLifetimeMs;
-                return Utils.readBytes(response.saslAuthBytes());
+                return Utils.copyArray(response.saslAuthBytes());
             } else
                 return null;
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index ccd94fc..7aca177 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.ChannelBuilders;
@@ -89,7 +90,6 @@ public class SaslServerAuthenticator implements Authenticator {
     // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
     static final int MAX_RECEIVE_SIZE = 524288;
     private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
-    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
 
     /**
      * The internal state transitions for initial authentication of a channel on the
@@ -448,17 +448,25 @@ public class SaslServerAuthenticator implements Authenticator {
             SaslAuthenticateRequest saslAuthenticateRequest = (SaslAuthenticateRequest) requestAndSize.request;
 
             try {
-                byte[] responseToken = saslServer.evaluateResponse(Utils.readBytes(saslAuthenticateRequest.saslAuthBytes()));
+                byte[] responseToken = saslServer.evaluateResponse(
+                        Utils.copyArray(saslAuthenticateRequest.data().authBytes()));
                 if (reauthInfo.reauthenticating() && saslServer.isComplete())
                     reauthInfo.ensurePrincipalUnchanged(principal());
                 // For versions with SASL_AUTHENTICATE header, send a response to SASL_AUTHENTICATE
request even if token is empty.
-                ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken);
+                byte[] responseBytes = responseToken == null ? new byte[0] : responseToken;
                 long sessionLifetimeMs = !saslServer.isComplete() ? 0L
                         : reauthInfo.calcCompletionTimesAndReturnSessionLifetimeMs();
-                sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE,
null, responseBuf, sessionLifetimeMs));
+                sendKafkaResponse(requestContext, new SaslAuthenticateResponse(
+                        new SaslAuthenticateResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setAuthBytes(responseBytes)
+                        .setSessionLifetimeMs(sessionLifetimeMs)));
             } catch (SaslAuthenticationException e) {
                 buildResponseOnAuthenticateFailure(requestContext,
-                        new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
+                        new SaslAuthenticateResponse(
+                                new SaslAuthenticateResponseData()
+                                .setErrorCode(Errors.SASL_AUTHENTICATION_FAILED.code())
+                                .setErrorMessage(e.getMessage())));
                 throw e;
             } catch (SaslException e) {
                 KerberosError kerberosError = KerberosError.fromException(e);
@@ -471,8 +479,10 @@ public class SaslServerAuthenticator implements Authenticator {
                     String errorMessage = "Authentication failed during "
                             + reauthInfo.authenticationOrReauthenticationText()
                             + " due to invalid credentials with SASL mechanism " + saslMechanism;
-                    sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
-                            errorMessage));
+                    sendKafkaResponse(requestContext, new SaslAuthenticateResponse(
+                            new SaslAuthenticateResponseData()
+                            .setErrorCode(Errors.SASL_AUTHENTICATION_FAILED.code())
+                            .setErrorMessage(errorMessage)));
                     throw new SaslAuthenticationException(errorMessage, e);
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 53417d2..5d2a5cf 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -275,6 +275,15 @@ public final class Utils {
     }
 
     /**
+     * Returns a copy of src byte array
+     * @param src The byte array to copy
+     * @return The copy
+     */
+    public static byte[] copyArray(byte[] src) {
+        return Arrays.copyOf(src, src.length);
+    }
+
+    /**
      * Check that the parameter t is not null
      *
      * @param t The object to check
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index f1e2063..5d60086 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -47,6 +47,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.Partiti
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
+import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.network.ListenerName;
@@ -1029,11 +1031,16 @@ public class RequestResponseTest {
     }
 
     private SaslAuthenticateRequest createSaslAuthenticateRequest() {
-        return new SaslAuthenticateRequest(ByteBuffer.wrap(new byte[0]));
+        SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(new
byte[0]);
+        return new SaslAuthenticateRequest(data);
     }
 
     private SaslAuthenticateResponse createSaslAuthenticateResponse() {
-        return new SaslAuthenticateResponse(Errors.NONE, null, ByteBuffer.wrap(new byte[0]),
Long.MAX_VALUE);
+        SaslAuthenticateResponseData data = new SaslAuthenticateResponseData()
+                .setErrorCode(Errors.NONE.code())
+                .setAuthBytes(new byte[0])
+                .setSessionLifetimeMs(Long.MAX_VALUE);
+        return new SaslAuthenticateResponse(data);
     }
 
     private ApiVersionsRequest createApiVersionRequest() {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 97d114f..5cfecf8 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.network.CertStores;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -1796,7 +1797,8 @@ public class SaslAuthenticatorTest {
         String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + TestJaasConfig.PASSWORD;
         ByteBuffer authBuf = ByteBuffer.wrap(authString.getBytes("UTF-8"));
         if (enableSaslAuthenticateHeader) {
-            SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(authBuf).build();
+            SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(authBuf.array());
+            SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(data).build();
             sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_AUTHENTICATE, request);
         } else {
             selector.send(new NetworkSend(node, authBuf));
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2361ee5..faf338e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -49,6 +49,7 @@ import org.apache.kafka.common.message.{CreateTopicsResponseData, DescribeGroups
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
 import org.apache.kafka.common.message.LeaveGroupResponseData
+import org.apache.kafka.common.message.SaslAuthenticateResponseData
 import org.apache.kafka.common.message.SaslHandshakeResponseData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
@@ -1402,13 +1403,15 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleSaslHandshakeRequest(request: RequestChannel.Request) {
-    val responseData = new SaslHandshakeResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code())
+    val responseData = new SaslHandshakeResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code)
     sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(responseData))
   }
 
   def handleSaslAuthenticateRequest(request: RequestChannel.Request) {
-    sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(Errors.ILLEGAL_SASL_STATE,
-      "SaslAuthenticate request received after successful authentication"))
+    val responseData = new SaslAuthenticateResponseData()
+      .setErrorCode(Errors.ILLEGAL_SASL_STATE.code)
+      .setErrorMessage("SaslAuthenticate request received after successful authentication")
+    sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(responseData))
   }
 
   def handleApiVersionsRequest(request: RequestChannel.Request) {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index b0eb7cb..1c8656d 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsR
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter,
ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
+import org.apache.kafka.common.message.SaslAuthenticateRequestData
 import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
@@ -276,7 +277,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new SaslHandshakeRequest.Builder(new SaslHandshakeRequestData().setMechanism("PLAIN"))
 
         case ApiKeys.SASL_AUTHENTICATE =>
-          new SaslAuthenticateRequest.Builder(ByteBuffer.wrap(new Array[Byte](0)))
+          new SaslAuthenticateRequest.Builder(new SaslAuthenticateRequestData().setAuthBytes(new
Array[Byte](0)))
 
         case ApiKeys.API_VERSIONS =>
           new ApiVersionsRequest.Builder


Mime
View raw message