kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7231; Ensure NetworkClient uses overridden request timeout (#5444)
Date Thu, 02 Aug 2018 18:05:55 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 0417183  KAFKA-7231; Ensure NetworkClient uses overridden request timeout (#5444)
0417183 is described below

commit 0417183da0c32174a1f55d990117403d91f85fff
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Aug 2 11:02:38 2018 -0700

    KAFKA-7231; Ensure NetworkClient uses overridden request timeout (#5444)
    
    Fixed incorrect use of default timeout instead of the argument explicitly passed to `newClientRequest`.
    
    Reviewers: Ron Dagostino <rndgstn@gmail.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../org/apache/kafka/clients/NetworkClient.java    |  2 +-
 .../apache/kafka/clients/NetworkClientTest.java    | 36 ++++++++++++----------
 2 files changed, 20 insertions(+), 18 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index fd16fe6..e4ba197 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1060,7 +1060,7 @@ public class NetworkClient implements KafkaClient {
                                           int requestTimeoutMs,
                                           RequestCompletionHandler callback) {
         return new ClientRequest(nodeId, requestBuilder, correlation++, clientId, createdTimeMs,
expectResponse,
-                defaultRequestTimeoutMs, callback);
+                requestTimeoutMs, callback);
     }
 
     public boolean discoverBrokerVersions() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index e13fcef..2876570 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -52,7 +52,7 @@ import static org.junit.Assert.assertTrue;
 
 public class NetworkClientTest {
 
-    protected final int minRequestTimeoutMs = 1000;
+    protected final int defaultRequestTimeoutMs = 1000;
     protected final MockTime time = new MockTime();
     protected final MockSelector selector = new MockSelector(time);
     protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
@@ -70,19 +70,19 @@ public class NetworkClientTest {
     private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
-                minRequestTimeoutMs, time, true, new ApiVersions(), new LogContext());
+                defaultRequestTimeoutMs, time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
         return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
-                "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, minRequestTimeoutMs,
+                "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
                 time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
-                64 * 1024, 64 * 1024, minRequestTimeoutMs, time, false, new ApiVersions(),
new LogContext());
+                64 * 1024, 64 * 1024, defaultRequestTimeoutMs, time, false, new ApiVersions(),
new LogContext());
     }
 
     @Before
@@ -144,7 +144,7 @@ public class NetworkClientTest {
                         Collections.emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = networkClient.newClientRequest(
-                node.idString(), builder, time.milliseconds(), true, minRequestTimeoutMs,
handler);
+                node.idString(), builder, time.milliseconds(), true, defaultRequestTimeoutMs,
handler);
         networkClient.send(request, time.milliseconds());
         networkClient.poll(1, time.milliseconds());
         assertEquals(1, networkClient.inFlightRequestCount());
@@ -187,18 +187,20 @@ public class NetworkClientTest {
         ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1,
                 1000, Collections.emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
-        int requestTimeoutMs = minRequestTimeoutMs + 5000;
+        int requestTimeoutMs = defaultRequestTimeoutMs + 5000;
         ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(),
true,
                 requestTimeoutMs, handler);
+        assertEquals(requestTimeoutMs, request.requestTimeoutMs());
         testRequestTimeout(request);
     }
 
     @Test
-    public void testMinRequestTimeout() {
+    public void testDefaultRequestTimeout() {
         awaitReady(client, node); // has to be before creating any request, as it may send
ApiVersionsRequest and its response is mocked with correlation id 0
         ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1,
                 1000, Collections.emptyMap());
         ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(),
true);
+        assertEquals(defaultRequestTimeoutMs, request.requestTimeoutMs());
         testRequestTimeout(request);
     }
 
@@ -222,7 +224,7 @@ public class NetworkClientTest {
             Collections.emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(),
true,
-                minRequestTimeoutMs, handler);
+                defaultRequestTimeoutMs, handler);
         client.send(request, time.milliseconds());
         client.poll(1, time.milliseconds());
         ResponseHeader respHeader = new ResponseHeader(request.correlationId());
@@ -281,7 +283,7 @@ public class NetworkClientTest {
             Collections.emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(),
true,
-                minRequestTimeoutMs, handler);
+                defaultRequestTimeoutMs, handler);
         client.send(request, time.milliseconds());
         client.poll(1, time.milliseconds());
         ResponseHeader respHeader = new ResponseHeader(request.correlationId());
@@ -349,7 +351,7 @@ public class NetworkClientTest {
         awaitReady(clientWithNoExponentialBackoff, node);
 
         selector.serverDisconnect(node.idString());
-        clientWithNoExponentialBackoff.poll(minRequestTimeoutMs, time.milliseconds());
+        clientWithNoExponentialBackoff.poll(defaultRequestTimeoutMs, time.milliseconds());
         long delay = clientWithNoExponentialBackoff.connectionDelay(node, time.milliseconds());
 
         assertEquals(reconnectBackoffMsTest, delay);
@@ -361,7 +363,7 @@ public class NetworkClientTest {
         // Start connecting and disconnect before the connection is established
         client.ready(node, time.milliseconds());
         selector.serverDisconnect(node.idString());
-        client.poll(minRequestTimeoutMs, time.milliseconds());
+        client.poll(defaultRequestTimeoutMs, time.milliseconds());
 
         // Second attempt should have the same behaviour as exponential backoff is disabled
         assertEquals(reconnectBackoffMsTest, delay);
@@ -391,7 +393,7 @@ public class NetworkClientTest {
 
         // First disconnection
         selector.serverDisconnect(node.idString());
-        client.poll(minRequestTimeoutMs, time.milliseconds());
+        client.poll(defaultRequestTimeoutMs, time.milliseconds());
         long delay = client.connectionDelay(node, time.milliseconds());
         long expectedDelay = reconnectBackoffMsTest;
         double jitter = 0.3;
@@ -404,7 +406,7 @@ public class NetworkClientTest {
         // Start connecting and disconnect before the connection is established
         client.ready(node, time.milliseconds());
         selector.serverDisconnect(node.idString());
-        client.poll(minRequestTimeoutMs, time.milliseconds());
+        client.poll(defaultRequestTimeoutMs, time.milliseconds());
 
         // Second attempt should take twice as long with twice the jitter
         expectedDelay = Math.round(delay * 2);
@@ -423,13 +425,13 @@ public class NetworkClientTest {
         long now = time.milliseconds();
         ClientRequest request = client.newClientRequest(node.idString(), builder, now, true);
         client.send(request, now);
-        client.poll(minRequestTimeoutMs, now);
+        client.poll(defaultRequestTimeoutMs, now);
         assertEquals(1, client.inFlightRequestCount(node.idString()));
         assertTrue(client.hasInFlightRequests(node.idString()));
         assertTrue(client.hasInFlightRequests());
 
         selector.close(node.idString());
-        List<ClientResponse> responses = client.poll(minRequestTimeoutMs, time.milliseconds());
+        List<ClientResponse> responses = client.poll(defaultRequestTimeoutMs, time.milliseconds());
         assertEquals(1, responses.size());
         assertTrue(responses.iterator().next().wasDisconnected());
     }
@@ -474,11 +476,11 @@ public class NetworkClientTest {
             }
         };
 
-        ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true,
minRequestTimeoutMs, callback);
+        ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true,
defaultRequestTimeoutMs, callback);
         client.send(request1, now);
         client.poll(0, now);
 
-        ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true,
minRequestTimeoutMs, callback);
+        ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true,
defaultRequestTimeoutMs, callback);
         client.send(request2, now);
         client.poll(0, now);
 


Mime
View raw message