kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2519; NetworkClient.close should remove node from inFlightRequests
Date Fri, 04 Sep 2015 16:30:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4ffbfefcc -> f25731265


KAFKA-2519; NetworkClient.close should remove node from inFlightRequests

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #193 from ijuma/kafka-2519-network-client-close-remove-in-flight


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f2573126
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f2573126
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f2573126

Branch: refs/heads/trunk
Commit: f25731265ef63e5440a188b3e5553c31fb9a397b
Parents: 4ffbfef
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri Sep 4 09:30:13 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Sep 4 09:30:13 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java  |  1 +
 .../apache/kafka/clients/NetworkClientTest.java  | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f2573126/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0a6f952..049b22e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -152,6 +152,7 @@ public class NetworkClient implements KafkaClient {
     @Override
     public void close(String nodeId) {
         selector.close(nodeId);
+        inFlightRequests.clearAll(nodeId);
         connectionStates.remove(nodeId);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2573126/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index ce6328a..69c93c3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -94,6 +94,25 @@ public class NetworkClientTest {
         checkSimpleRequestResponse(clientWithStaticNodes);
     }
 
+    @Test
+    public void testClose() {
+        client.ready(node, time.milliseconds());
+        awaitReady(client, node);
+        client.poll(1, time.milliseconds());
+        assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
+
+        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition,
ByteBuffer>emptyMap());
+        RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
+        RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
+        ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
+        client.send(request);
+        assertEquals("There should be 1 in-flight request after send", 1, client.inFlightRequestCount(node.idString()));
+
+        client.close(node.idString());
+        assertEquals("There should be no in-flight request after close", 0, client.inFlightRequestCount(node.idString()));
+        assertFalse("Connection should not be ready after close", client.isReady(node, 0));
+    }
+
     private void checkSimpleRequestResponse(NetworkClient networkClient) {
         ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition,
ByteBuffer>emptyMap());
         RequestHeader reqHeader = networkClient.nextRequestHeader(ApiKeys.PRODUCE);


Mime
View raw message