kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2604; Remove `completeAll` and improve timeout passed to `Selector.poll` from `NetworkClient.poll`
Date Mon, 05 Oct 2015 18:57:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f9faf334b -> b88ba9a6b


KAFKA-2604; Remove `completeAll` and improve timeout passed to `Selector.poll` from `NetworkClient.poll`

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ewen Cheslack-Postava, Jason Gustafson, Guozhang Wang

Closes #272 from ijuma/kafka-2640-remove-complete-all-poll-timeout


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b88ba9a6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b88ba9a6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b88ba9a6

Branch: refs/heads/trunk
Commit: b88ba9a6bdde4d0828c513cde3251667ae13e655
Parents: f9faf33
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon Oct 5 12:01:33 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Oct 5 12:01:33 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/KafkaClient.java   | 21 ++---------
 .../org/apache/kafka/clients/NetworkClient.java | 38 +++-----------------
 .../apache/kafka/common/network/Selector.java   | 13 ++++---
 .../org/apache/kafka/clients/MockClient.java    | 13 -------
 4 files changed, 16 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b88ba9a6/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 478368e..8c6e39a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -75,7 +75,9 @@ public interface KafkaClient extends Closeable {
     /**
      * Do actual reads and writes from sockets.
      * 
-     * @param timeout The maximum amount of time to wait for responses in ms
+     * @param timeout The maximum amount of time to wait for responses in ms, must be non-negative.
The implementation
+     *                is free to use a lower value if appropriate (common reasons for this
are a lower request or
+     *                metadata update timeout)
      * @param now The current time in ms
      * @throws IllegalStateException If a request is sent to an unready node
      */
@@ -89,23 +91,6 @@ public interface KafkaClient extends Closeable {
     public void close(String nodeId);
 
     /**
-     * Complete all in-flight requests for a given connection
-     * 
-     * @param id The connection to complete requests for
-     * @param now The current time in ms
-     * @return All requests that complete during this time period.
-     */
-    public List<ClientResponse> completeAll(String id, long now);
-
-    /**
-     * Complete all in-flight requests
-     * 
-     * @param now The current time in ms
-     * @return All requests that complete during this time period.
-     */
-    public List<ClientResponse> completeAll(long now);
-
-    /**
      * Choose the node with the fewest outstanding requests. This method will prefer a node
with an existing connection,
      * but will potentially choose a node for which we don't yet have a connection if all
existing connections are in
      * use.

http://git-wip-us.apache.org/repos/asf/kafka/blob/b88ba9a6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0275daf..6f39ac9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -246,7 +246,9 @@ public class NetworkClient implements KafkaClient {
     /**
      * Do actual reads and writes to sockets.
      *
-     * @param timeout The maximum amount of time to wait (in ms) for responses if there are
none immediately
+     * @param timeout The maximum amount of time to wait (in ms) for responses if there are
none immediately,
+     *                must be non-negative. The actual timeout will be the minimum of timeout,
request timeout and
+     *                metadata timeout
      * @param now The current time in milliseconds
      * @return The list of responses received
      */
@@ -254,7 +256,7 @@ public class NetworkClient implements KafkaClient {
     public List<ClientResponse> poll(long timeout, long now) {
         long metadataTimeout = metadataUpdater.maybeUpdate(now);
         try {
-            this.selector.poll(Math.min(timeout, metadataTimeout));
+            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
         } catch (IOException e) {
             log.error("Unexpected error during I/O in producer network thread", e);
         }
@@ -282,38 +284,6 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
-     * Await all the outstanding responses for requests on the given connection
-     *
-     * @param node The node to block on
-     * @param now The current time in ms
-     * @return All the collected responses
-     */
-    @Override
-    public List<ClientResponse> completeAll(String node, long now) {
-        try {
-            this.selector.muteAll();
-            this.selector.unmute(node);
-            List<ClientResponse> responses = new ArrayList<ClientResponse>();
-            while (inFlightRequestCount(node) > 0)
-                responses.addAll(poll(Integer.MAX_VALUE, now));
-            return responses;
-        } finally {
-            this.selector.unmuteAll();
-        }
-    }
-
-    /**
-     * Wait for all outstanding requests to complete.
-     */
-    @Override
-    public List<ClientResponse> completeAll(long now) {
-        List<ClientResponse> responses = new ArrayList<ClientResponse>();
-        while (inFlightRequestCount() > 0)
-            responses.addAll(poll(Integer.MAX_VALUE, now));
-        return responses;
-    }
-
-    /**
      * Get the number of in-flight requests
      */
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/b88ba9a6/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 9e52078..7cdc167 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -239,12 +239,15 @@ public class Selector implements Selectable {
      * the poll to add the completedReceives. If there are any active channels in the "stagedReceives"
we set "timeout" to 0
      * and pop response and add to the completedReceives.
      *
-     * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
+     * @param timeout The amount of time to wait, in milliseconds, which must be non-negative
+     * @throws IllegalArgumentException If `timeout` is negative
      * @throws IllegalStateException If a send is given for which we have no existing connection
or for which there is
      *         already an in-progress send
      */
     @Override
     public void poll(long timeout) throws IOException {
+        if (timeout < 0)
+            throw new IllegalArgumentException("timeout should be >= 0");
         clear();
         if (hasStagedReceives())
             timeout = 0;
@@ -414,15 +417,17 @@ public class Selector implements Selectable {
     /**
      * Check for data, waiting up to the given timeout.
      *
-     * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
+     * @param ms Length of time to wait, in milliseconds, which must be non-negative
      * @return The number of keys ready
+     * @throws IllegalArgumentException
      * @throws IOException
      */
     private int select(long ms) throws IOException {
+        if (ms < 0L)
+            throw new IllegalArgumentException("timeout should be >= 0");
+
         if (ms == 0L)
             return this.nioSelector.selectNow();
-        else if (ms < 0L)
-            return this.nioSelector.select();
         else
             return this.nioSelector.select(ms);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b88ba9a6/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 4dfdd2a..67d894d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -127,19 +127,6 @@ public class MockClient implements KafkaClient {
         return copy;
     }
 
-    @Override
-    public List<ClientResponse> completeAll(String node, long now) {
-        return completeAll(now);
-    }
-
-    @Override
-    public List<ClientResponse> completeAll(long now) {
-        List<ClientResponse> responses = poll(0, now);
-        if (requests.size() > 0)
-            throw new IllegalStateException("Requests without responses remain.");
-        return responses;
-    }
-
     public Queue<ClientRequest> requests() {
         return this.requests;
     }


Mime
View raw message