kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3600; Use ApiVersions to check if broker supports required api versions
Date Sat, 17 Dec 2016 12:26:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e156f5179 -> d6b0b520f


KAFKA-3600; Use ApiVersions to check if broker supports required api versions

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Colin P. Mccabe <cmccabe@confluent.io>, Dana Powers <dana.powers@gmail.com>, Gwen Shapira <cshapi@gmail.com>, Grant Henke <granthenke@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1251 from SinghAsDev/KAFKA-3600


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d6b0b520
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d6b0b520
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d6b0b520

Branch: refs/heads/trunk
Commit: d6b0b520fc0755fa560af84f5fc4c9f86699739c
Parents: e156f51
Author: Ashish Singh <asingh@cloudera.com>
Authored: Sat Dec 17 04:21:54 2016 -0800
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Dec 17 04:24:15 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientUtils.java   |  13 ++
 .../kafka/clients/ClusterConnectionStates.java  |  71 ++++----
 .../apache/kafka/clients/ConnectionState.java   |   7 +-
 .../kafka/clients/ManualMetadataUpdater.java    |   4 +-
 .../apache/kafka/clients/MetadataUpdater.java   |   4 +-
 .../org/apache/kafka/clients/NetworkClient.java | 172 +++++++++++++++----
 .../kafka/clients/consumer/KafkaConsumer.java   |  19 +-
 .../kafka/clients/producer/KafkaProducer.java   |  15 +-
 .../kafka/common/protocol/ProtoUtils.java       |   6 +
 .../common/requests/ApiVersionsRequest.java     |   2 +-
 .../common/requests/ApiVersionsResponse.java    |  17 +-
 .../authenticator/SaslClientAuthenticator.java  |   3 +
 .../authenticator/SaslServerAuthenticator.java  |   2 +-
 .../NetworkClientApiVersionsCheckTest.java      |  89 ++++++++++
 .../apache/kafka/clients/NetworkClientTest.java |  73 ++++++--
 .../org/apache/kafka/test/DelayedReceive.java   |  40 +++++
 .../org/apache/kafka/test/MockSelector.java     |  20 ++-
 .../runtime/distributed/WorkerGroupMember.java  |  16 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   2 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |   2 +-
 .../unit/kafka/server/ApiVersionsTest.scala     |   4 +-
 tests/kafkatest/version.py                      |   7 +-
 22 files changed, 484 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 2672b70..be663ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -15,6 +15,7 @@ package org.apache.kafka.clients;
 import java.io.Closeable;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
@@ -22,10 +23,13 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.network.LoginType;
 import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,4 +89,13 @@ public class ClientUtils {
         return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs, clientSaslMechanism, true);
     }
 
+    public static Collection<ApiVersionsResponse.ApiVersion> buildExpectedApiVersions(Collection<ApiKeys> apiKeys) {
+        List<ApiVersionsResponse.ApiVersion> expectedApiVersions = new ArrayList<>();
+        for (ApiKeys apiKey : apiKeys)
+            expectedApiVersions.add(
+                    // once backwards client compatibility is added, expected min API version for an API should be set to it's min version
+                    new ApiVersionsResponse.ApiVersion(
+                            apiKey.id, Protocol.CURR_VERSION[apiKey.id], Protocol.CURR_VERSION[apiKey.id]));
+        return expectedApiVersions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 6b90ab8..350f5a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -31,8 +31,8 @@ final class ClusterConnectionStates {
     /**
      * Return true iff we can currently initiate a new connection. This will be the case if we are not
      * connected and haven't been connected for at least the minimum reconnection backoff period.
-     * @param id The connection id to check
-     * @param now The current time in MS
+     * @param id the connection id to check
+     * @param now the current time in MS
      * @return true if we can initiate a new connection
      */
     public boolean canConnect(String id, long now) {
@@ -44,9 +44,9 @@ final class ClusterConnectionStates {
     }
 
     /**
-     * Return true if we are disconnected from the given node and can't re-establish a connection yet
-     * @param id The connection to check
-     * @param now The current time in ms
+     * Return true if we are disconnected from the given node and can't re-establish a connection yet.
+     * @param id the connection to check
+     * @param now the current time in ms
      */
     public boolean isBlackedOut(String id, long now) {
         NodeConnectionState state = nodeState.get(id);
@@ -60,8 +60,8 @@ final class ClusterConnectionStates {
      * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
      * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
      * connections.
-     * @param id The connection to check
-     * @param now The current time in ms
+     * @param id the connection to check
+     * @param now the current time in ms
      */
     public long connectionDelay(String id, long now) {
         NodeConnectionState state = nodeState.get(id);
@@ -87,40 +87,49 @@ final class ClusterConnectionStates {
 
     /**
      * Enter the connecting state for the given connection.
-     * @param id The id of the connection
-     * @param now The current time.
+     * @param id the id of the connection
+     * @param now the current time
      */
     public void connecting(String id, long now) {
         nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now));
     }
 
     /**
-     * Return true iff a specific connection is connected
-     * @param id The id of the connection to check
+     * Enter the disconnected state for the given node.
+     * @param id the connection we have disconnected
+     * @param now the current time
      */
-    public boolean isConnected(String id) {
-        NodeConnectionState state = nodeState.get(id);
-        return state != null && state.state == ConnectionState.CONNECTED;
+    public void disconnected(String id, long now) {
+        NodeConnectionState nodeState = nodeState(id);
+        nodeState.state = ConnectionState.DISCONNECTED;
+        nodeState.lastConnectAttemptMs = now;
     }
 
     /**
-     * Enter the connected state for the given connection
-     * @param id The connection identifier
+     * Enter the checking_api_versions state for the given node.
+     * @param id the connection identifier
      */
-    public void connected(String id) {
+    public void checkingApiVersions(String id) {
         NodeConnectionState nodeState = nodeState(id);
-        nodeState.state = ConnectionState.CONNECTED;
+        nodeState.state = ConnectionState.CHECKING_API_VERSIONS;
     }
 
     /**
-     * Enter the disconnected state for the given node
-     * @param id The connection we have disconnected
-     * @param now The current time
+     * Enter the ready state for the given node.
+     * @param id the connection identifier
      */
-    public void disconnected(String id, long now) {
+    public void ready(String id) {
         NodeConnectionState nodeState = nodeState(id);
-        nodeState.state = ConnectionState.DISCONNECTED;
-        nodeState.lastConnectAttemptMs = now;
+        nodeState.state = ConnectionState.READY;
+    }
+
+    /**
+     * Return true if the connection is ready.
+     * @param id the connection identifier
+     */
+    public boolean isReady(String id) {
+        NodeConnectionState state = nodeState.get(id);
+        return state != null && state.state == ConnectionState.READY;
     }
 
     /**
@@ -128,24 +137,24 @@ final class ClusterConnectionStates {
      * is the impact on `connectionDelay`: it will be 0 after this call whereas `reconnectBackoffMs` will be taken
      * into account after `disconnected` is called.
      *
-     * @param id The connection to remove
+     * @param id the connection to remove
      */
     public void remove(String id) {
         nodeState.remove(id);
     }
     
     /**
-     * Get the state of a given connection
-     * @param id The id of the connection
-     * @return The state of our connection
+     * Get the state of a given connection.
+     * @param id the id of the connection
+     * @return the state of our connection
      */
     public ConnectionState connectionState(String id) {
         return nodeState(id).state;
     }
     
     /**
-     * Get the state of a given node
-     * @param id The connection to fetch the state for
+     * Get the state of a given node.
+     * @param id the connection to fetch the state for
      */
     private NodeConnectionState nodeState(String id) {
         NodeConnectionState state = this.nodeState.get(id);
@@ -155,7 +164,7 @@ final class ClusterConnectionStates {
     }
     
     /**
-     * The state of our connection to a node
+     * The state of our connection to a node.
      */
     private static class NodeConnectionState {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
index 3867f8e..288230b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
@@ -14,7 +14,12 @@ package org.apache.kafka.clients;
 
 /**
  * The states of a node connection
+ *
+ * DISCONNECTED: connection has not been successfully established yet
+ * CONNECTING: connection is under progress
+ * CHECKING_API_VERSIONS: connection has been established and api versions check is in progress. Failure of this check will cause connection to close
+ * READY: connection is ready to send requests
  */
 public enum ConnectionState {
-    DISCONNECTED, CONNECTING, CONNECTED
+    DISCONNECTED, CONNECTING, CHECKING_API_VERSIONS, READY
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index 1c9fa79..6486d15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -14,7 +14,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 
 import java.util.ArrayList;
@@ -66,7 +66,7 @@ public class ManualMetadataUpdater implements MetadataUpdater {
     }
 
     @Override
-    public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse body) {
+    public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
         // Do nothing
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 21c50bd..37c77bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -14,7 +14,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 
 import java.util.List;
@@ -64,7 +64,7 @@ interface MetadataUpdater {
      * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
      * requests with special handling for completed receives of such requests.
      */
-    void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse body);
+    void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse);
 
     /**
      * Schedules an update of the current cluster metadata info. A subsequent call to `maybeUpdate` would trigger the

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 124810d..c7728b1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -13,6 +13,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.Selectable;
@@ -22,6 +23,8 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
@@ -35,9 +38,14 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 
 /**
  * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
@@ -48,6 +56,11 @@ import java.util.Random;
 public class NetworkClient implements KafkaClient {
 
     private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
+    private static final Metadata NO_METADATA = null;
+    private static final MetadataUpdater NO_METADATA_UPDATER = null;
+
+    /* apis used by this client. If this is null, client won't perform api version check against connecting brokers */
+    private final Collection<ApiVersionsResponse.ApiVersion> requiredApiVersions;
 
     /* the selector used to perform network i/o */
     private final Selectable selector;
@@ -82,6 +95,24 @@ public class NetworkClient implements KafkaClient {
 
     private final Time time;
 
+    private final Map<Integer, Collection<ApiVersionsResponse.ApiVersion>> nodeApiVersions = new HashMap<>();
+
+    private final Set<String> nodesNeedingApiVersionsFetch = new HashSet<>();
+
+    public NetworkClient(Selectable selector,
+                         Metadata metadata,
+                         String clientId,
+                         int maxInFlightRequestsPerConnection,
+                         long reconnectBackoffMs,
+                         int socketSendBuffer,
+                         int socketReceiveBuffer,
+                         int requestTimeoutMs,
+                         Time time,
+                         Collection<ApiVersionsResponse.ApiVersion> requiredApiVersions) {
+        this(NO_METADATA_UPDATER, metadata, selector, clientId, maxInFlightRequestsPerConnection,
+                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, requiredApiVersions);
+    }
+
     public NetworkClient(Selectable selector,
                          Metadata metadata,
                          String clientId,
@@ -91,8 +122,22 @@ public class NetworkClient implements KafkaClient {
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
                          Time time) {
-        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
-                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time);
+        this(NO_METADATA_UPDATER, metadata, selector, clientId, maxInFlightRequestsPerConnection,
+                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, null);
+    }
+
+    public NetworkClient(Selectable selector,
+                         MetadataUpdater metadataUpdater,
+                         String clientId,
+                         int maxInFlightRequestsPerConnection,
+                         long reconnectBackoffMs,
+                         int socketSendBuffer,
+                         int socketReceiveBuffer,
+                         int requestTimeoutMs,
+                         Time time,
+                         Collection<ApiVersionsResponse.ApiVersion> requiredApiVersions) {
+        this(metadataUpdater, NO_METADATA, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
+                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, requiredApiVersions);
     }
 
     public NetworkClient(Selectable selector,
@@ -104,8 +149,8 @@ public class NetworkClient implements KafkaClient {
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
                          Time time) {
-        this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
-                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time);
+        this(metadataUpdater, NO_METADATA, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
+                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, null);
     }
 
     private NetworkClient(MetadataUpdater metadataUpdater,
@@ -117,7 +162,10 @@ public class NetworkClient implements KafkaClient {
                           int socketSendBuffer,
                           int socketReceiveBuffer,
                           int requestTimeoutMs,
-                          Time time) {
+                          Time time,
+                          Collection<ApiVersionsResponse.ApiVersion> requiredApiVersions) {
+
+        this.requiredApiVersions = requiredApiVersions;
 
         /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
          * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
@@ -174,7 +222,7 @@ public class NetworkClient implements KafkaClient {
     public void close(String nodeId) {
         selector.close(nodeId);
         for (InFlightRequest request : inFlightRequests.clearAll(nodeId))
-            if (request.isInternalMetadataRequest)
+            if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id)
                 metadataUpdater.handleDisconnection(request.destination);
         connectionStates.remove(nodeId);
     }
@@ -226,7 +274,16 @@ public class NetworkClient implements KafkaClient {
      * @param node The node
      */
     private boolean canSendRequest(String node) {
-        return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
+        return connectionStates.isReady(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
+    }
+
+    /**
+     * Are we connected and able to send API versions request to the given connection?
+     *
+     * @param node The node
+     */
+    private boolean canSendApiVersionsRequest(String node) {
+        return this.requiredApiVersions != null && nodesNeedingApiVersionsFetch.contains(node);
     }
 
     /**
@@ -245,9 +302,12 @@ public class NetworkClient implements KafkaClient {
         doSend(clientRequest, true, now);
     }
 
-    private void doSend(ClientRequest request, boolean isInternalMetadataRequest, long now) {
+    private void doSend(ClientRequest request, boolean isInternalRequest, long now) {
         String nodeId = request.destination();
-        if (!canSendRequest(nodeId))
+        if (request.header().apiKey() == ApiKeys.API_VERSIONS.id) {
+            if (!canSendApiVersionsRequest(nodeId))
+                throw new IllegalStateException("Attempt to send API Versions request to node " + nodeId + " which is not ready.");
+        } else if (!canSendRequest(nodeId))
             throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
 
         Send send = request.body().toSend(nodeId, request.header());
@@ -257,7 +317,7 @@ public class NetworkClient implements KafkaClient {
                 request.destination(),
                 request.callback(),
                 request.expectResponse(),
-                isInternalMetadataRequest,
+                isInternalRequest,
                 send,
                 now);
 
@@ -289,7 +349,7 @@ public class NetworkClient implements KafkaClient {
         handleCompletedSends(responses, updatedNow);
         handleCompletedReceives(responses, updatedNow);
         handleDisconnections(responses, updatedNow);
-        handleConnections();
+        handleConnections(updatedNow);
         handleTimedOutRequests(responses, updatedNow);
 
         // invoke callbacks
@@ -378,7 +438,7 @@ public class NetworkClient implements KafkaClient {
             int idx = (offset + i) % nodes.size();
             Node node = nodes.get(idx);
             int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
-            if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {
+            if (currInflight == 0 && this.connectionStates.isReady(node.idString())) {
                 // if we find an established connection with no in-flight requests we can stop right away
                 return node;
             } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
@@ -410,9 +470,11 @@ public class NetworkClient implements KafkaClient {
      */
     private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
         connectionStates.disconnected(nodeId, now);
+        nodeApiVersions.remove(nodeId);
+        nodesNeedingApiVersionsFetch.remove(nodeId);
         for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
             log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
-            if (request.isInternalMetadataRequest)
+            if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id)
                 metadataUpdater.handleDisconnection(request.destination);
             else
                 responses.add(request.disconnected(now));
@@ -468,13 +530,36 @@ public class NetworkClient implements KafkaClient {
             String source = receive.source();
             InFlightRequest req = inFlightRequests.completeNext(source);
             AbstractResponse body = parseResponse(receive.payload(), req.header);
-            if (req.isInternalMetadataRequest)
-                metadataUpdater.handleCompletedMetadataResponse(req.header, now, body);
+            log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body);
+            if (req.isInternalRequest && body instanceof MetadataResponse)
+                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
+            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
+                handleApiVersionsResponse(req, (ApiVersionsResponse) body);
             else
                 responses.add(req.completed(body, now));
         }
     }
 
+    private void handleApiVersionsResponse(InFlightRequest req, ApiVersionsResponse apiVersionsResponse) {
+        final String node = req.destination;
+        for (ApiVersionsResponse.ApiVersion requiredApiVersion : requiredApiVersions) {
+            final ApiVersionsResponse.ApiVersion supportedApiVersion = apiVersionsResponse.apiVersion(requiredApiVersion.apiKey);
+            if (supportedApiVersion == null) {
+                close(node);
+                throw new KafkaException("Node " + req.destination + " does not support Api " + requiredApiVersion.apiKey + ".");
+            }
+            if (supportedApiVersion.maxVersion < requiredApiVersion.minVersion || supportedApiVersion.minVersion > requiredApiVersion.maxVersion) {
+                close(node);
+                throw new KafkaException("Node " + req.destination + " does not support required versions for Api " + requiredApiVersion.apiKey + "." +
+                        " Supported versions: " + "[" + supportedApiVersion.minVersion + ", " + supportedApiVersion.maxVersion + "]" + "," +
+                        " required versions: " + "[" + requiredApiVersion.minVersion + ", " + requiredApiVersion.maxVersion + "]" + ".");
+            }
+        }
+        nodeApiVersions.put(Integer.parseInt(node), apiVersionsResponse.apiVersions());
+        this.connectionStates.ready(node);
+        log.debug("Recorded API versions for node {}: {}", node, apiVersionsResponse.apiVersions());
+    }
+
     /**
      * Handle any disconnected connections
      *
@@ -494,10 +579,35 @@ public class NetworkClient implements KafkaClient {
     /**
      * Record any newly completed connections
      */
-    private void handleConnections() {
+    private void handleConnections(long now) {
         for (String node : this.selector.connected()) {
             log.debug("Completed connection to node {}", node);
-            this.connectionStates.connected(node);
+            // Though the node is connected, we might not still be able to send requests. For instance,
+            // in case of SSL connection, SSL handshake happens after connection is established.
+            if (this.requiredApiVersions == null) {
+                this.connectionStates.ready(node);
+            } else {
+                this.connectionStates.checkingApiVersions(node);
+                nodesNeedingApiVersionsFetch.add(node);
+            }
+        }
+        maybeInitiateApiVersionsFetch(now);
+    }
+
+    private void maybeInitiateApiVersionsFetch(long now) {
+        if (this.requiredApiVersions == null)
+            return;
+
+        Iterator<String> iter = nodesNeedingApiVersionsFetch.iterator();
+        while (iter.hasNext()) {
+            String node = iter.next();
+            if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
+                log.debug("Initiating API versions fetch from node {}.", node);
+                ClientRequest clientRequest = new ClientRequest(node, now, true,
+                        nextRequestHeader(ApiKeys.API_VERSIONS), ApiVersionsRequest.API_VERSIONS_REQUEST, null);
+                doSend(clientRequest, true, now);
+                iter.remove();
+            }
         }
     }
 
@@ -590,35 +700,29 @@ public class NetworkClient implements KafkaClient {
         }
 
         @Override
-        public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse response) {
-            if (!(response instanceof MetadataResponse))
-                throw new IllegalStateException("Unexpected response type in metadata handler: " + response);
-            handleMetadataResponse(requestHeader, (MetadataResponse) response, now);
-        }
-
-        @Override
-        public void requestUpdate() {
-            this.metadata.requestUpdate();
-        }
-
-        private void handleMetadataResponse(RequestHeader header, MetadataResponse response, long now) {
+        public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
             this.metadataFetchInProgress = false;
             Cluster cluster = response.cluster();
             // check if any topics metadata failed to get updated
             Map<String, Errors> errors = response.errors();
             if (!errors.isEmpty())
-                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
+                log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);
 
             // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
             // created which means we will get errors and no nodes until it exists
             if (cluster.nodes().size() > 0) {
                 this.metadata.update(cluster, now);
             } else {
-                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
+                log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
                 this.metadata.failedUpdate(now);
             }
         }
 
+        @Override
+        public void requestUpdate() {
+            this.metadata.requestUpdate();
+        }
+
         /**
          * Return true if there's at least one connection establishment is currently underway
          */
@@ -679,7 +783,7 @@ public class NetworkClient implements KafkaClient {
         final String destination;
         final RequestCompletionHandler callback;
         final boolean expectResponse;
-        final boolean isInternalMetadataRequest; // used to flag metadata fetches which are triggered internally by NetworkClient
+        final boolean isInternalRequest; // used to flag requests which are initiated internally by NetworkClient
         final Send send;
         final long sendTimeMs;
         final long createdTimeMs;
@@ -689,14 +793,14 @@ public class NetworkClient implements KafkaClient {
                                String destination,
                                RequestCompletionHandler callback,
                                boolean expectResponse,
-                               boolean isInternalMetadataRequest,
+                               boolean isInternalRequest,
                                Send send,
                                long sendTimeMs) {
             this.header = header;
             this.destination = destination;
             this.callback = callback;
             this.expectResponse = expectResponse;
-            this.isInternalMetadataRequest = isInternalMetadataRequest;
+            this.isInternalRequest = isInternalRequest;
             this.send = send;
             this.sendTimeMs = sendTimeMs;
             this.createdTimeMs = createdTimeMs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5e66a8d..7547c6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Time;
@@ -46,6 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
@@ -505,6 +508,18 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private static final long NO_CURRENT_THREAD = -1L;
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.consumer";
+    private static final List<ApiKeys> CONSUMER_APIS = Arrays.asList(
+            ApiKeys.METADATA,
+            ApiKeys.FETCH,
+            ApiKeys.GROUP_COORDINATOR,
+            ApiKeys.HEARTBEAT,
+            ApiKeys.JOIN_GROUP,
+            ApiKeys.LEAVE_GROUP,
+            ApiKeys.LIST_OFFSETS,
+            ApiKeys.OFFSET_COMMIT,
+            ApiKeys.OFFSET_FETCH,
+            ApiKeys.SYNC_GROUP);
+    private static final Collection<ApiVersionsResponse.ApiVersion> EXPECTED_API_VERSIONS = ClientUtils.buildExpectedApiVersions(CONSUMER_APIS);
 
     private final String clientId;
     private final ConsumerCoordinator coordinator;
@@ -656,7 +671,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
-                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
+                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                    time,
+                    EXPECTED_API_VERSIONS);
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 9dd8459..36cda68 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -39,9 +39,11 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
@@ -50,6 +52,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -130,6 +134,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
     private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.producer";
+    /**
+     * APIs used by KafkaProducer
+     */
+    private static final List<ApiKeys> PRODUCER_APIS = Arrays.asList(
+            ApiKeys.METADATA,
+            ApiKeys.PRODUCE);
+    private static final Collection<ApiVersionsResponse.ApiVersion> EXPECTED_API_VERSIONS = ClientUtils.buildExpectedApiVersions(PRODUCER_APIS);
 
     private String clientId;
     private final Partitioner partitioner;
@@ -307,7 +318,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
-                    this.requestTimeoutMs, time);
+                    this.requestTimeoutMs,
+                    time,
+                    EXPECTED_API_VERSIONS);
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
index 98befdc..c4d2cc6 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -40,6 +40,12 @@ public class ProtoUtils {
         return Protocol.CURR_VERSION[apiKey];
     }
 
+    public static short oldestVersion(int apiKey) {
+        if (apiKey < 0 || apiKey >= Protocol.CURR_VERSION.length)
+            throw new IllegalArgumentException("Invalid api key: " + apiKey);
+        return Protocol.MIN_VERSIONS[apiKey];
+    }
+
     public static Schema requestSchema(int apiKey, int version) {
         return schemaFor(Protocol.REQUESTS, apiKey, version);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index d9ef37e..5be5db1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -22,8 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 
 public class ApiVersionsRequest extends AbstractRequest {
-
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.API_VERSIONS.id);
+    public static final ApiVersionsRequest API_VERSIONS_REQUEST = new ApiVersionsRequest();
 
     public ApiVersionsRequest() {
         super(new Struct(CURRENT_SCHEMA));

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 0bf1039..b9f453d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -30,8 +30,8 @@ import java.util.Map;
 public class ApiVersionsResponse extends AbstractResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.API_VERSIONS.id);
-    private static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse();
 
+    public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse();
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String API_VERSIONS_KEY_NAME = "api_versions";
     public static final String API_KEY_NAME = "api_key";
@@ -56,6 +56,15 @@ public class ApiVersionsResponse extends AbstractResponse {
             this.minVersion = minVersion;
             this.maxVersion = maxVersion;
         }
+
+        @Override
+        public String toString() {
+            return "ApiVersion(" +
+                    "apiKey=" + apiKey +
+                    ", minVersion=" + minVersion +
+                    ", maxVersion= " + maxVersion +
+                    ")";
+        }
     }
 
     public ApiVersionsResponse(short errorCode, List<ApiVersion> apiVersions) {
@@ -108,10 +117,6 @@ public class ApiVersionsResponse extends AbstractResponse {
         return new ApiVersionsResponse(error.code(), Collections.<ApiVersion>emptyList());
     }
 
-    public static ApiVersionsResponse apiVersionsResponse() {
-        return API_VERSIONS_RESPONSE;
-    }
-
     private static ApiVersionsResponse createApiVersionsResponse() {
         List<ApiVersion> versionList = new ArrayList<>();
         for (ApiKeys apiKey : ApiKeys.values()) {
@@ -127,4 +132,4 @@ public class ApiVersionsResponse extends AbstractResponse {
         }
         return tempApiIdToApiVersion;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index b9c3b33..c6d9a2f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -152,6 +152,9 @@ public class SaslClientAuthenticator implements Authenticator {
 
         switch (saslState) {
             case SEND_HANDSHAKE_REQUEST:
+                // When multiple versions of SASL_HANDSHAKE_REQUEST are to be supported,
+                // API_VERSIONS_REQUEST must be sent prior to sending SASL_HANDSHAKE_REQUEST to
+                // fetch supported versions.
                 String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
                 currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, clientId, correlationId++);
                 SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index b193bf2..4d1cab7 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -359,7 +359,7 @@ public class SaslServerAuthenticator implements Authenticator {
     }
 
     private void handleApiVersionsRequest(RequestHeader requestHeader) throws IOException, UnsupportedSaslMechanismException {
-        sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse());
+        sendKafkaResponse(requestHeader, ApiVersionsResponse.API_VERSIONS_RESPONSE);
     }
 
     private void sendKafkaResponse(RequestHeader requestHeader, AbstractResponse response) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/test/java/org/apache/kafka/clients/NetworkClientApiVersionsCheckTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientApiVersionsCheckTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientApiVersionsCheckTest.java
new file mode 100644
index 0000000..dbe8619
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientApiVersionsCheckTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.Protocol;
+import org.apache.kafka.common.requests.AbstractRequestResponse;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class NetworkClientApiVersionsCheckTest extends NetworkClientTest {
+    
+    private static final List<ApiVersionsResponse.ApiVersion> EXPECTED_API_VERSIONS = Collections.singletonList(
+            new ApiVersionsResponse.ApiVersion(ApiKeys.METADATA.id, Protocol.MIN_VERSIONS[ApiKeys.METADATA.id],
+                    Protocol.CURR_VERSION[ApiKeys.METADATA.id]));
+
+    @Override
+    protected List<ApiVersionsResponse.ApiVersion> expectedApiVersions() {
+        return EXPECTED_API_VERSIONS;
+    }
+
+    @Test
+    public void testUnsupportedLesserApiVersions() {
+        unsupportedApiVersionsCheck(expectedApiVersions(), (short) (ProtoUtils.latestVersion(ApiKeys.METADATA.id) + 1), Short.MAX_VALUE, "Node 0 does not support required versions for Api " + ApiKeys.METADATA.id);
+    }
+
+    @Test
+    public void testUnsupportedGreaterApiVersions() {
+        unsupportedApiVersionsCheck(expectedApiVersions(), Short.MIN_VALUE, (short) (ProtoUtils.oldestVersion(ApiKeys.METADATA.id) - 1), "Node 0 does not support required versions for Api " + ApiKeys.METADATA.id);
+    }
+
+    @Test
+    public void testUnsupportedMissingApiVersions() {
+        unsupportedApiVersionsCheck(Collections.<ApiVersionsResponse.ApiVersion>emptyList(), Short.MIN_VALUE, Short.MAX_VALUE, "Node 0 does not support Api " + ApiKeys.METADATA.id);
+    }
+
+    private void unsupportedApiVersionsCheck(final List<ApiVersionsResponse.ApiVersion> expectedApiVersions,
+                                             short minVersion, short maxVersion, String errorMessage) {
+        ResponseHeader responseHeader = new ResponseHeader(0);
+        List<ApiVersionsResponse.ApiVersion> apiVersions = new ArrayList<>();
+        for (ApiVersionsResponse.ApiVersion apiVersion : expectedApiVersions)
+            apiVersions.add(new ApiVersionsResponse.ApiVersion(apiVersion.apiKey, minVersion, maxVersion));
+        ApiVersionsResponse response = new ApiVersionsResponse(Errors.NONE.code(), apiVersions);
+        ByteBuffer buffer = AbstractRequestResponse.serialize(responseHeader, response);
+
+        selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
+        try {
+            long deadline = time.milliseconds() + TestUtils.DEFAULT_MAX_WAIT_MS;
+            while (time.milliseconds() < deadline && !client.ready(node, time.milliseconds()))
+                client.poll(1, time.milliseconds());
+
+            fail("KafkaException should have been thrown for " + expectedApiVersions + ", minVersion: " + minVersion +
+                    ", maxVersion: " + maxVersion);
+        } catch (KafkaException kex) {
+            assertTrue("Exception containing `" + errorMessage + "` should have been thrown due to ApiVersions " +
+                    "check, but exception message was: " + kex.getMessage(), kex.getMessage().contains(errorMessage));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 0966ee5..9e73901 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -21,14 +21,18 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.AbstractRequestResponse;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -36,6 +40,7 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -45,19 +50,42 @@ import static org.junit.Assert.assertTrue;
 
 public class NetworkClientTest {
 
-    private final int requestTimeoutMs = 1000;
-    private MockTime time = new MockTime();
-    private MockSelector selector = new MockSelector(time);
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-    private int nodeId = 1;
-    private Cluster cluster = TestUtils.singletonCluster("test", nodeId);
-    private Node node = cluster.nodes().get(0);
-    private long reconnectBackoffMsTest = 10 * 1000;
-    private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, 
-            64 * 1024, 64 * 1024, requestTimeoutMs, time);
-    
-    private NetworkClient clientWithStaticNodes = new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
-            "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, time);
+    protected final int requestTimeoutMs = 1000;
+    protected final MockTime time = new MockTime();
+    protected final MockSelector selector = new MockSelector(time);
+    protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    protected final int nodeId = 1;
+    protected final Cluster cluster = TestUtils.singletonCluster("test", nodeId);
+    protected final Node node = cluster.nodes().get(0);
+    protected final long reconnectBackoffMsTest = 10 * 1000;
+    protected final NetworkClient client = createNetworkClient();
+
+    private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
+
+    private NetworkClient createNetworkClient() {
+        final Collection<ApiVersionsResponse.ApiVersion> expectedApiVersions = expectedApiVersions();
+        if (expectedApiVersions == null)
+            return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest,
+                    64 * 1024, 64 * 1024, requestTimeoutMs, time);
+        else
+            return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest,
+                    64 * 1024, 64 * 1024, requestTimeoutMs, time, expectedApiVersions);
+
+    }
+
+    private NetworkClient createNetworkClientWithStaticNodes() {
+        final Collection<ApiVersionsResponse.ApiVersion> expectedApiVersions = expectedApiVersions();
+        if (expectedApiVersions == null)
+            return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
+                    "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, time);
+        else
+            return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
+                    "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, time, expectedApiVersions);
+    }
+
+    protected List<ApiVersionsResponse.ApiVersion> expectedApiVersions() {
+        return null;
+    }
 
     @Before
     public void setup() {
@@ -102,11 +130,11 @@ public class NetworkClientTest {
     }
 
     private void checkSimpleRequestResponse(NetworkClient networkClient) {
+        awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
         ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
         RequestHeader reqHeader = networkClient.nextRequestHeader(ApiKeys.PRODUCE);
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, handler);
-        awaitReady(networkClient, node);
         networkClient.send(request, time.milliseconds());
         networkClient.poll(1, time.milliseconds());
         assertEquals(1, networkClient.inFlightRequestCount());
@@ -126,18 +154,31 @@ public class NetworkClientTest {
         assertEquals("Should be correlated to the original request", request.header(), handler.response.requestHeader());
     }
 
-    private void awaitReady(NetworkClient client, Node node) {
+    private void maybeSetExpectedApiVersionsResponse() {
+        List<ApiVersionsResponse.ApiVersion> expectedApiVersions = expectedApiVersions();
+        if (expectedApiVersions == null)
+            return;
+
+        ResponseHeader responseHeader = new ResponseHeader(0);
+        ByteBuffer buffer = AbstractRequestResponse.serialize(responseHeader, new ApiVersionsResponse(Errors.NONE.code(),
+                expectedApiVersions));
+        selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
+    }
+
+    protected void awaitReady(NetworkClient client, Node node) {
+        maybeSetExpectedApiVersionsResponse();
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());
+        selector.clear();
     }
 
     @Test
     public void testRequestTimeout() {
+        awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
         ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
         RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, handler);
-        awaitReady(client, node);
         long now = time.milliseconds();
         client.send(request, now);
         // sleeping to make sure that the time since last send is greater than requestTimeOut

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/test/java/org/apache/kafka/test/DelayedReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/DelayedReceive.java b/clients/src/test/java/org/apache/kafka/test/DelayedReceive.java
new file mode 100644
index 0000000..3d982cc
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/DelayedReceive.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.network.NetworkReceive;
+
+/**
+ * Used by MockSelector to allow clients to add responses whose associated requests are added later.
+ */
+public class DelayedReceive {
+    private final String source;
+    private final NetworkReceive receive;
+
+    public DelayedReceive(String source, NetworkReceive receive) {
+        this.source = source;
+        this.receive = receive;
+    }
+
+    public String source() {
+        return source;
+    }
+
+    public NetworkReceive receive() {
+        return receive;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index b39ff7e..6f080b0 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -15,6 +15,7 @@ package org.apache.kafka.test;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.kafka.common.network.NetworkReceive;
@@ -34,6 +35,7 @@ public class MockSelector implements Selectable {
     private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
     private final List<String> disconnected = new ArrayList<String>();
     private final List<String> connected = new ArrayList<String>();
+    private final List<DelayedReceive> delayedReceives = new ArrayList<>();
 
     public MockSelector(Time time) {
         this.time = time;
@@ -79,6 +81,16 @@ public class MockSelector implements Selectable {
     public void poll(long timeout) throws IOException {
         this.completedSends.addAll(this.initiatedSends);
         this.initiatedSends.clear();
+        for (Send completedSend : completedSends) {
+            Iterator<DelayedReceive> delayedReceiveIterator = delayedReceives.iterator();
+            while (delayedReceiveIterator.hasNext()) {
+                DelayedReceive delayedReceive = delayedReceiveIterator.next();
+                if (delayedReceive.source().equals(completedSend.destination())) {
+                    completedReceives.add(delayedReceive.receive());
+                    delayedReceiveIterator.remove();
+                }
+            }
+        }
         time.sleep(timeout);
     }
 
@@ -100,6 +112,10 @@ public class MockSelector implements Selectable {
         this.completedReceives.add(receive);
     }
 
+    public void delayedReceive(DelayedReceive receive) {
+        this.delayedReceives.add(receive);
+    }
+
     @Override
     public List<String> disconnected() {
         return disconnected;
@@ -107,7 +123,9 @@ public class MockSelector implements Selectable {
 
     @Override
     public List<String> connected() {
-        return connected;
+        List<String> currentConnected = new ArrayList<>(connected);
+        connected.clear();
+        return currentConnected;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index a5213db..d6a134f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -30,6 +30,8 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -38,6 +40,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +60,14 @@ public class WorkerGroupMember {
 
     private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.connect";
+    private static final List<ApiKeys> WORKER_GROUP_MEMBER_APIS = Arrays.asList(
+            ApiKeys.METADATA,
+            ApiKeys.GROUP_COORDINATOR,
+            ApiKeys.HEARTBEAT,
+            ApiKeys.JOIN_GROUP,
+            ApiKeys.LEAVE_GROUP,
+            ApiKeys.SYNC_GROUP);
+    private static final Collection<ApiVersionsResponse.ApiVersion> EXPECTED_API_VERSIONS = ClientUtils.buildExpectedApiVersions(WORKER_GROUP_MEMBER_APIS);
 
     private final Time time;
     private final String clientId;
@@ -99,7 +111,9 @@ public class WorkerGroupMember {
                     config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
-                    config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
+                    config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    time,
+                    EXPECTED_API_VERSIONS);
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
             this.coordinator = new WorkerCoordinator(this.client,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a4a1e2a..a89d515 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1133,7 +1133,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
     val responseBody = if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
-      ApiVersionsResponse.apiVersionsResponse
+      ApiVersionsResponse.API_VERSIONS_RESPONSE
     else
       ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)
     requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 85c1777..68e017f 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
 object ApiVersionsRequestTest {
   def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse) {
     assertEquals("API keys in ApiVersionsResponse must match API keys supported by broker.", ApiKeys.values.length, apiVersionsResponse.apiVersions.size)
-    for (expectedApiVersion: ApiVersion <- ApiVersionsResponse.apiVersionsResponse.apiVersions.asScala) {
+    for (expectedApiVersion: ApiVersion <- ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions.asScala) {
       val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey)
       assertNotNull(s"API key ${actualApiVersion.apiKey} is supported by broker, but not received in ApiVersionsResponse.", actualApiVersion)
       assertEquals("API key must be supported by the broker.", expectedApiVersion.apiKey, actualApiVersion.apiKey)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
index 177b509..b26f621 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
@@ -26,11 +26,11 @@ class ApiVersionsTest {
 
   @Test
   def testApiVersions {
-    val apiVersions = ApiVersionsResponse.apiVersionsResponse.apiVersions
+    val apiVersions = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions
     assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length)
 
     for (key <- ApiKeys.values) {
-      val version = ApiVersionsResponse.apiVersionsResponse.apiVersion(key.id)
+      val version = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersion(key.id)
       assertNotNull(s"Could not find ApiVersion for API ${key.name}", version)
       assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, Protocol.MIN_VERSIONS(key.id))
       assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, Protocol.CURR_VERSION(key.id))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b0b520/tests/kafkatest/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 239a9f4..5dd09a1 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -78,7 +78,10 @@ LATEST_0_9 = V_0_9_0_1
 # 0.10.0.X versions
 V_0_10_0_0 = KafkaVersion("0.10.0.0")
 V_0_10_0_1 = KafkaVersion("0.10.0.1")
-# Adding 0.10.0 as the next version will be 0.10.1.x
 LATEST_0_10_0 = V_0_10_0_1
 
-LATEST_0_10 = LATEST_0_10_0
\ No newline at end of file
+# 0.10.1.x versions
+V_0_10_1_0 = KafkaVersion("0.10.1.0")
+LATEST_0_10_1 = V_0_10_1_0
+
+LATEST_0_10 = LATEST_0_10_1


Mime
View raw message