kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject kafka git commit: KAFKA-6289; NetworkClient should not expose failed internal ApiVersions requests
Date Fri, 08 Dec 2017 10:58:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0 305e7949c -> 46fc8eca9


KAFKA-6289; NetworkClient should not expose failed internal ApiVersions requests

The NetworkClient internally ApiVersion requests to each broker following connection establishment.
If this request happens to fail (perhaps due to an incompatible broker), the NetworkClient
includes the response in the result of poll(). Applications will generally not be expecting
this response which may lead to failed assertions (or in the case of AdminClient, an obscure
log message).

I've added test cases which await the ApiVersion request sent by NetworkClient to be in-flight,
and then disconnect the connection and verify that the response is not included from poll().

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #4280 from hachikuji/KAFKA-6289


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/46fc8eca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/46fc8eca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/46fc8eca

Branch: refs/heads/1.0
Commit: 46fc8eca933cdbce705474f595ef7d67e59364b3
Parents: 305e794
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Dec 8 10:54:31 2017 +0000
Committer: Rajini Sivaram <rajinisivaram@googlemail.com>
Committed: Fri Dec 8 10:59:24 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java | 10 ++--
 .../apache/kafka/clients/NetworkClientTest.java | 60 ++++++++++++++++----
 .../org/apache/kafka/test/MockSelector.java     | 25 +++++++-
 3 files changed, 78 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/46fc8eca/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index ee7258a..ea4eacc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -618,12 +618,12 @@ public class NetworkClient implements KafkaClient {
                 break; // Disconnections in other states are logged at debug level in Selector
         }
         for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
-            log.trace("Cancelled request {} with correlation id {} due to node {} being disconnected",
request.request,
-                    request.header.correlationId(), nodeId);
-            if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA)
-                metadataUpdater.handleDisconnection(request.destination);
-            else
+            log.trace("Cancelled request {} {} with correlation id {} due to node {} being
disconnected",
+                    request.header.apiKey(), request.request, request.header.correlationId(),
nodeId);
+            if (!request.isInternalRequest)
                 responses.add(request.disconnected(now));
+            else if (request.header.apiKey() == ApiKeys.METADATA)
+                metadataUpdater.handleDisconnection(request.destination);
         }
         AuthenticationException authenticationException = connectionStates.authenticationException(nodeId);
         if (authenticationException != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/46fc8eca/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index edbd72d..0bf0a69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -159,7 +160,7 @@ public class NetworkClientTest {
                 request.correlationId(), handler.response.requestHeader().correlationId());
     }
 
-    private void maybeSetExpectedApiVersionsResponse() {
+    private void setExpectedApiVersionsResponse() {
         ApiVersionsResponse response = ApiVersionsResponse.defaultApiVersionsResponse();
         short apiVersionsResponseVersion = response.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion;
         ByteBuffer buffer = response.serialize(apiVersionsResponseVersion, new ResponseHeader(0));
@@ -168,7 +169,7 @@ public class NetworkClientTest {
 
     private void awaitReady(NetworkClient client, Node node) {
         if (client.discoverBrokerVersions()) {
-            maybeSetExpectedApiVersionsResponse();
+            setExpectedApiVersionsResponse();
         }
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());
@@ -185,11 +186,15 @@ public class NetworkClientTest {
         ClientRequest request = client.newClientRequest(
                 node.idString(), builder, now, true, handler);
         client.send(request, now);
+
         // sleeping to make sure that the time since last send is greater than requestTimeOut
         time.sleep(3000);
-        client.poll(3000, time.milliseconds());
-        assertEquals(1, selector.disconnected().size());
-        assertTrue("Node not found in disconnected map", selector.disconnected().containsKey(node.idString()));
+        List<ClientResponse> responses = client.poll(3000, time.milliseconds());
+
+        assertEquals(1, responses.size());
+        ClientResponse clientResponse = responses.get(0);
+        assertEquals(node.idString(), clientResponse.destination());
+        assertTrue("Expected response to fail due to disconnection", clientResponse.wasDisconnected());
     }
 
     @Test
@@ -207,13 +212,12 @@ public class NetworkClientTest {
         time.sleep(reconnectBackoffMsTest);
         
         // CLOSE node 
-        selector.close(node.idString());
+        selector.serverDisconnect(node.idString());
         
         client.poll(1, time.milliseconds());
         assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node,
time.milliseconds()));
         leastNode = client.leastLoadedNode(time.milliseconds());
         assertEquals("There should be NO leastloadednode", leastNode, null);
-        
     }
 
     @Test
@@ -238,7 +242,7 @@ public class NetworkClientTest {
     public void testConnectionDelayDisconnectedWithNoExponentialBackoff() {
         awaitReady(clientWithNoExponentialBackoff, node);
 
-        selector.close(node.idString());
+        selector.serverDisconnect(node.idString());
         clientWithNoExponentialBackoff.poll(requestTimeoutMs, time.milliseconds());
         long delay = clientWithNoExponentialBackoff.connectionDelay(node, time.milliseconds());
 
@@ -250,7 +254,7 @@ public class NetworkClientTest {
 
         // Start connecting and disconnect before the connection is established
         client.ready(node, time.milliseconds());
-        selector.close(node.idString());
+        selector.serverDisconnect(node.idString());
         client.poll(requestTimeoutMs, time.milliseconds());
 
         // Second attempt should have the same behaviour as exponential backoff is disabled
@@ -280,7 +284,7 @@ public class NetworkClientTest {
         awaitReady(client, node);
 
         // First disconnection
-        selector.close(node.idString());
+        selector.serverDisconnect(node.idString());
         client.poll(requestTimeoutMs, time.milliseconds());
         long delay = client.connectionDelay(node, time.milliseconds());
         long expectedDelay = reconnectBackoffMsTest;
@@ -293,7 +297,7 @@ public class NetworkClientTest {
 
         // Start connecting and disconnect before the connection is established
         client.ready(node, time.milliseconds());
-        selector.close(node.idString());
+        selector.serverDisconnect(node.idString());
         client.poll(requestTimeoutMs, time.milliseconds());
 
         // Second attempt should take twice as long with twice the jitter
@@ -325,6 +329,28 @@ public class NetworkClientTest {
     }
 
     @Test
+    public void testServerDisconnectAfterInternalApiVersionRequest() throws Exception {
+        awaitInFlightApiVersionRequest();
+        selector.serverDisconnect(node.idString());
+
+        // The failed ApiVersion request should not be forwarded to upper layers
+        List<ClientResponse> responses = client.poll(0, time.milliseconds());
+        assertFalse(client.hasInFlightRequests(node.idString()));
+        assertTrue(responses.isEmpty());
+    }
+
+    @Test
+    public void testClientDisconnectAfterInternalApiVersionRequest() throws Exception {
+        awaitInFlightApiVersionRequest();
+        client.disconnect(node.idString());
+        assertFalse(client.hasInFlightRequests(node.idString()));
+
+        // The failed ApiVersion request should not be forwarded to upper layers
+        List<ClientResponse> responses = client.poll(0, time.milliseconds());
+        assertTrue(responses.isEmpty());
+    }
+
+    @Test
     public void testCallDisconnect() throws Exception {
         awaitReady(client, node);
         assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(),
@@ -345,6 +371,18 @@ public class NetworkClientTest {
         assertTrue(client.canConnect(node, time.milliseconds()));
     }
 
+    private void awaitInFlightApiVersionRequest() throws Exception {
+        client.ready(node, time.milliseconds());
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                client.poll(0, time.milliseconds());
+                return client.hasInFlightRequests(node.idString());
+            }
+        }, 1000, "");
+        assertFalse(client.isReady(node, time.milliseconds()));
+    }
+
     private static class TestCallbackHandler implements RequestCompletionHandler {
         public boolean executed = false;
         public ClientResponse response;

http://git-wip-us.apache.org/repos/asf/kafka/blob/46fc8eca/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 225aba4..6fc1b1b 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -19,6 +19,7 @@ package org.apache.kafka.test;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -63,7 +64,11 @@ public class MockSelector implements Selectable {
 
     @Override
     public void close(String id) {
-        this.disconnected.put(id, ChannelState.LOCAL_CLOSE);
+        // Note that there are no notifications for client-side disconnects
+
+        removeSendsForNode(id, completedSends);
+        removeSendsForNode(id, initiatedSends);
+
         for (int i = 0; i < this.connected.size(); i++) {
             if (this.connected.get(i).equals(id)) {
                 this.connected.remove(i);
@@ -72,6 +77,24 @@ public class MockSelector implements Selectable {
         }
     }
 
+    /**
+     * Simulate a server disconnect. This id will be present in {@link #disconnected()} on
+     * the next {@link #poll(long)}.
+     */
+    public void serverDisconnect(String id) {
+        this.disconnected.put(id, ChannelState.READY);
+        close(id);
+    }
+
+    private void removeSendsForNode(String id, Collection<Send> sends) {
+        Iterator<Send> iter = sends.iterator();
+        while (iter.hasNext()) {
+            Send send = iter.next();
+            if (id.equals(send.destination()))
+                iter.remove();
+        }
+    }
+
     public void clear() {
         this.completedSends.clear();
         this.completedReceives.clear();


Mime
View raw message