kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: MINOR: Introduce NetworkClient.hasInFlightRequests
Date Thu, 09 Mar 2017 17:46:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 294018a57 -> 65650ba4d


MINOR: Introduce NetworkClient.hasInFlightRequests

It’s a minor optimisation, but simple enough.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #2658 from ijuma/has-in-flight-requests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65650ba4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65650ba4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65650ba4

Branch: refs/heads/trunk
Commit: 65650ba4dcba8a9729cb9cb6477a62a7b7c3714e
Parents: 294018a
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Mar 9 09:27:33 2017 -0800
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Mar 9 09:27:33 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/clients/InFlightRequests.java  | 27 +++++++++++++++++---
 .../org/apache/kafka/clients/KafkaClient.java   | 16 +++++++++---
 .../org/apache/kafka/clients/NetworkClient.java | 16 +++++++++---
 .../internals/ConsumerNetworkClient.java        |  4 +--
 .../org/apache/kafka/clients/MockClient.java    | 17 +++++++++++-
 .../apache/kafka/clients/NetworkClientTest.java |  3 +++
 .../clients/producer/internals/SenderTest.java  | 11 +++++++-
 7 files changed, 80 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/65650ba4/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 06b8fed..a29075d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -55,7 +55,7 @@ final class InFlightRequests {
     private Deque<NetworkClient.InFlightRequest> requestQueue(String node) {
         Deque<NetworkClient.InFlightRequest> reqs = requests.get(node);
         if (reqs == null || reqs.isEmpty())
-            throw new IllegalStateException("Response from server for which there are no
in-flight requests.");
+            throw new IllegalStateException("There are no in-flight requests for node " +
node);
         return reqs;
     }
 
@@ -96,19 +96,27 @@ final class InFlightRequests {
     }
 
     /**
-     * Return the number of inflight requests directed at the given node
+     * Return the number of in-flight requests directed at the given node
      * @param node The node
      * @return The request count.
      */
-    public int inFlightRequestCount(String node) {
+    public int count(String node) {
         Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
         return queue == null ? 0 : queue.size();
     }
 
     /**
+     * Return true if there is no in-flight request directed at the given node and false
otherwise
+     */
+    public boolean isEmpty(String node) {
+        Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
+        return queue != null && !queue.isEmpty();
+    }
+
+    /**
      * Count all in-flight requests for all nodes
      */
-    public int inFlightRequestCount() {
+    public int count() {
         int total = 0;
         for (Deque<NetworkClient.InFlightRequest> deque : this.requests.values())
             total += deque.size();
@@ -116,6 +124,17 @@ final class InFlightRequests {
     }
 
     /**
+     * Return true if there is no in-flight request and false otherwise
+     */
+    public boolean isEmpty() {
+        for (Deque<NetworkClient.InFlightRequest> deque : this.requests.values()) {
+            if (!deque.isEmpty())
+                return false;
+        }
+        return false;
+    }
+
+    /**
      * Clear out all the in-flight requests for the given node and return them
      *
      * @param node The node

http://git-wip-us.apache.org/repos/asf/kafka/blob/65650ba4/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 86ffa49..83a0009 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -108,6 +108,11 @@ public interface KafkaClient extends Closeable {
     int inFlightRequestCount();
 
     /**
+     * Return true if there is at least one in-flight request and false otherwise.
+     */
+    boolean hasInFlightRequests();
+
+    /**
      * Get the total in-flight requests for a particular node
      * 
      * @param nodeId The id of the node
@@ -115,6 +120,11 @@ public interface KafkaClient extends Closeable {
     int inFlightRequestCount(String nodeId);
 
     /**
+     * Return true if there is at least one in-flight request for a particular node and false
otherwise.
+     */
+    boolean hasInFlightRequests(String nodeId);
+
+    /**
      * Wake up the client if it is currently blocked waiting for I/O
      */
     void wakeup();
@@ -139,7 +149,7 @@ public interface KafkaClient extends Closeable {
      * @param expectResponse true iff we expect a response
      * @param callback the callback to invoke when we get a response
      */
-    ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder,
-                                          long createdTimeMs, boolean expectResponse,
-                                          RequestCompletionHandler callback);
+    ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder,
long createdTimeMs,
+                                   boolean expectResponse, RequestCompletionHandler callback);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65650ba4/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index c76a738..b6f8b0e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -386,7 +386,12 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public int inFlightRequestCount() {
-        return this.inFlightRequests.inFlightRequestCount();
+        return this.inFlightRequests.count();
+    }
+
+    @Override
+    public boolean hasInFlightRequests() {
+        return !this.inFlightRequests.isEmpty();
     }
 
     /**
@@ -394,7 +399,12 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public int inFlightRequestCount(String node) {
-        return this.inFlightRequests.inFlightRequestCount(node);
+        return this.inFlightRequests.count(node);
+    }
+
+    @Override
+    public boolean hasInFlightRequests(String node) {
+        return this.inFlightRequests.isEmpty(node);
     }
 
     /**
@@ -431,7 +441,7 @@ public class NetworkClient implements KafkaClient {
         for (int i = 0; i < nodes.size(); i++) {
             int idx = (offset + i) % nodes.size();
             Node node = nodes.get(idx);
-            int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
+            int currInflight = this.inFlightRequests.count(node.idString());
             if (currInflight == 0 && this.connectionStates.isReady(node.idString()))
{
                 // if we find an established connection with no in-flight requests we can
stop right away
                 log.trace("Found least loaded node {} connected with no in-flight requests",
node);

http://git-wip-us.apache.org/repos/asf/kafka/blob/65650ba4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 478ed3f..890fe7a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -304,7 +304,7 @@ public class ConsumerNetworkClient implements Closeable {
         if (unsent.hasRequests(node))
             return true;
         synchronized (this) {
-            return client.inFlightRequestCount(node.idString()) > 0;
+            return client.hasInFlightRequests(node.idString());
         }
     }
 
@@ -328,7 +328,7 @@ public class ConsumerNetworkClient implements Closeable {
         if (unsent.hasRequests())
             return true;
         synchronized (this) {
-            return client.inFlightRequestCount() > 0;
+            return client.hasInFlightRequests();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65650ba4/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 7e05881..3726f1a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -296,8 +296,23 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
+    public boolean hasInFlightRequests() {
+        return !requests.isEmpty();
+    }
+
+    @Override
     public int inFlightRequestCount(String node) {
-        return requests.size();
+        int result = 0;
+        for (ClientRequest req : requests) {
+            if (req.destination().equals(node))
+                ++result;
+        }
+        return result;
+    }
+
+    @Override
+    public boolean hasInFlightRequests(String node) {
+        return inFlightRequestCount(node) > 0;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/65650ba4/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index d22c04a..de60f44 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -116,9 +116,11 @@ public class NetworkClientTest {
         client.send(request, time.milliseconds());
         assertEquals("There should be 1 in-flight request after send", 1,
                 client.inFlightRequestCount(node.idString()));
+        assertTrue(client.hasInFlightRequests(node.idString()));
 
         client.close(node.idString());
         assertEquals("There should be no in-flight request after close", 0, client.inFlightRequestCount(node.idString()));
+        assertFalse(client.hasInFlightRequests(node.idString()));
         assertFalse("Connection should not be ready after close", client.isReady(node, 0));
     }
 
@@ -247,6 +249,7 @@ public class NetworkClientTest {
         client.send(request, now);
         client.poll(requestTimeoutMs, now);
         assertEquals(1, client.inFlightRequestCount(node.idString()));
+        assertTrue(client.hasInFlightRequests(node.idString()));
 
         selector.close(node.idString());
         List<ClientResponse> responses = client.poll(requestTimeoutMs, time.milliseconds());

http://git-wip-us.apache.org/repos/asf/kafka/blob/65650ba4/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 50ea219..8a80790 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -103,9 +103,11 @@ public class SenderTest {
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
+        assertTrue(client.hasInFlightRequests());
         client.respond(produceResponse(tp, offset, Errors.NONE, 0));
         sender.run(time.milliseconds());
-        assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount());
+        assertEquals("All requests completed.", 0, client.inFlightRequestCount());
+        assertFalse(client.hasInFlightRequests());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
         assertEquals(offset, future.get().offset());
@@ -153,14 +155,17 @@ public class SenderTest {
             String id = client.requests().peek().destination();
             Node node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
+            assertTrue(client.hasInFlightRequests());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
             client.disconnect(id);
             assertEquals(0, client.inFlightRequestCount());
+            assertFalse(client.hasInFlightRequests());
             assertFalse("Client ready status should be false", client.isReady(node, 0L));
             sender.run(time.milliseconds()); // receive error
             sender.run(time.milliseconds()); // reconnect
             sender.run(time.milliseconds()); // resend
             assertEquals(1, client.inFlightRequestCount());
+            assertTrue(client.hasInFlightRequests());
             long offset = 0;
             client.respond(produceResponse(tp, offset, Errors.NONE, 0));
             sender.run(time.milliseconds());
@@ -212,6 +217,7 @@ public class SenderTest {
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             Node node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
+            assertTrue(client.hasInFlightRequests());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 
             time.sleep(900);
@@ -224,6 +230,7 @@ public class SenderTest {
             // Sender should not send the second message to node 0.
             sender.run(time.milliseconds());
             assertEquals(1, client.inFlightRequestCount());
+            assertTrue(client.hasInFlightRequests());
         } finally {
             m.close();
         }
@@ -246,6 +253,7 @@ public class SenderTest {
         client.respond(produceResponse(tp, offset++, Errors.NONE, 0));
         sender.run(time.milliseconds());
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
+        assertFalse(client.hasInFlightRequests());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
 
@@ -261,6 +269,7 @@ public class SenderTest {
         client.respond(produceResponse(tp, offset++, Errors.NONE, 0));
         sender.run(time.milliseconds());
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
+        assertFalse(client.hasInFlightRequests());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
     }


Mime
View raw message