kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4841; NetworkClient should only consider a connection to have failed after attempt to connect
Date Wed, 08 Mar 2017 01:11:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 146edd530 -> c6bccddb9


KAFKA-4841; NetworkClient should only consider a connection to have failed after attempt to
connect

Also fix a potential reordering bug and include a few clean-ups.

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>, Jason Gustafson <jason@confluent.io>,
Ismael Juma <ismael@juma.me.uk>

Closes #2641 from lindong28/KAFKA-4820-followup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6bccddb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6bccddb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6bccddb

Branch: refs/heads/trunk
Commit: c6bccddb94affe8f85110004341f75bd66b839eb
Parents: 146edd5
Author: Dong Lin <lindong28@gmail.com>
Authored: Wed Mar 8 00:44:28 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Mar 8 01:01:11 2017 +0000

----------------------------------------------------------------------
 .../kafka/clients/ClusterConnectionStates.java  | 13 ++-
 .../org/apache/kafka/clients/NetworkClient.java |  2 +-
 .../internals/ConsumerNetworkClient.java        | 95 +++++++++-----------
 3 files changed, 52 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6bccddb/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 0ce7993..9bde1a2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -36,7 +36,7 @@ final class ClusterConnectionStates {
      * Return true iff we can currently initiate a new connection. This will be the case
if we are not
      * connected and haven't been connected for at least the minimum reconnection backoff
period.
      * @param id the connection id to check
-     * @param now the current time in MS
+     * @param now the current time in ms
      * @return true if we can initiate a new connection
      */
     public boolean canConnect(String id, long now) {
@@ -137,6 +137,15 @@ final class ClusterConnectionStates {
     }
 
     /**
+     * Return true if the connection has been disconnected
+     * @param id The id of the node to check
+     */
+    public boolean isDisconnected(String id) {
+        NodeConnectionState state = nodeState.get(id);
+        return state != null && state.state == ConnectionState.DISCONNECTED;
+    }
+
+    /**
      * Remove the given node from the tracked connection states. The main difference between
this and `disconnected`
      * is the impact on `connectionDelay`: it will be 0 after this call whereas `reconnectBackoffMs`
will be taken
      * into account after `disconnected` is called.
@@ -155,7 +164,7 @@ final class ClusterConnectionStates {
     public ConnectionState connectionState(String id) {
         return nodeState(id).state;
     }
-    
+
     /**
      * Get the state of a given node.
      * @param id the connection to fetch the state for

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6bccddb/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 8d3adda..c76a738 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -229,7 +229,7 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public boolean connectionFailed(Node node) {
-        return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED);
+        return connectionStates.isDisconnected(node.idString());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6bccddb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 2fa7667..eb25359 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -35,13 +35,12 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.kafka.common.errors.InterruptException;
@@ -288,7 +287,7 @@ public class ConsumerNetworkClient implements Closeable {
      */
     public int pendingRequestCount(Node node) {
         synchronized (this) {
-            return unsent.getRequestCount(node) + client.inFlightRequestCount(node.idString());
+            return unsent.requestCount(node) + client.inFlightRequestCount(node.idString());
         }
     }
 
@@ -313,7 +312,7 @@ public class ConsumerNetworkClient implements Closeable {
      */
     public int pendingRequestCount() {
         synchronized (this) {
-            return unsent.getRequestCount() + client.inFlightRequestCount();
+            return unsent.requestCount() + client.inFlightRequestCount();
         }
     }
 
@@ -351,12 +350,12 @@ public class ConsumerNetworkClient implements Closeable {
         // by NetworkClient, so we just need to check whether connections for any of the
unsent
         // requests have been disconnected; if they have, then we complete the corresponding
future
         // and set the disconnect flag in the ClientResponse
-        Set<Node> nodes = unsent.getNodes();
-        for (Node node: nodes) {
+        List<Node> nodes = unsent.nodes();
+        for (Node node : nodes) {
             if (client.connectionFailed(node)) {
                 // Remove entry before invoking request callback to avoid callbacks handling
                 // coordinator failures traversing the unsent list again.
-                List<ClientRequest> requests = unsent.remove(node);
+                Collection<ClientRequest> requests = unsent.remove(node);
                 for (ClientRequest request : requests) {
                     RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)
request.callback();
                     handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()),
@@ -370,7 +369,7 @@ public class ConsumerNetworkClient implements Closeable {
     private void failExpiredRequests(long now) {
         // clear all expired unsent requests and fail their corresponding futures
         List<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs);
-        for (ClientRequest request: expiredRequests) {
+        for (ClientRequest request : expiredRequests) {
             RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
             handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs
+ " ms."));
         }
@@ -379,7 +378,7 @@ public class ConsumerNetworkClient implements Closeable {
     public void failUnsentRequests(Node node, RuntimeException e) {
         // clear unsent requests to node and fail their corresponding futures
         synchronized (this) {
-            List<ClientRequest> unsentRequests = unsent.remove(node);
+            Collection<ClientRequest> unsentRequests = unsent.remove(node);
             for (ClientRequest unsentRequest : unsentRequests) {
                 RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)
unsentRequest.callback();
                 handler.onFailure(e);
@@ -393,25 +392,15 @@ public class ConsumerNetworkClient implements Closeable {
     private boolean trySend(long now) {
         // send any requests that can be sent now
         boolean requestsSent = false;
-        Set<Node> nodes = unsent.getNodes();
-        for (Node node: nodes) {
-            if (client.ready(node, now)) {
-                // Remove entry before invoking request callback to avoid callbacks handling
-                // coordinator failures traversing the unsent list again.
-                List<ClientRequest> requests = unsent.remove(node);
-                try {
-                    Iterator<ClientRequest> iterator = requests.iterator();
-                    while (iterator.hasNext()) {
-                        ClientRequest request = iterator.next();
-                        if (!client.ready(node, now))
-                            break;
-                        client.send(request, now);
-                        requestsSent = true;
-                        iterator.remove();
-                    }
-                } finally {
-                    if (!requests.isEmpty())
-                        unsent.put(node, requests);
+
+        for (Node node : unsent.nodes()) {
+            Iterator<ClientRequest> iterator = unsent.requestIterator(node);
+            while (iterator.hasNext()) {
+                ClientRequest request = iterator.next();
+                if (client.ready(node, now)) {
+                    client.send(request, now);
+                    iterator.remove();
+                    requestsSent = true;
                 }
             }
         }
@@ -538,52 +527,43 @@ public class ConsumerNetworkClient implements Closeable {
 
 
     /*
-     * A threadsafe helper class to hold requests per node that has not been sent yet
+     * A threadsafe helper class to hold requests per node that have not been sent yet
      */
     private final static class UnsentRequests {
-        private final Map<Node, List<ClientRequest>> unsent;
+        private final Map<Node, ConcurrentLinkedQueue<ClientRequest>> unsent;
 
         public UnsentRequests() {
             unsent = new HashMap<>();
         }
 
-        public synchronized void put(Node node, List<ClientRequest> requests) {
-            List<ClientRequest> nodeUnsent = unsent.get(node);
-            if (nodeUnsent == null) {
-                nodeUnsent = new ArrayList<>();
-                unsent.put(node, nodeUnsent);
-            }
-            nodeUnsent.addAll(requests);
-        }
-
         public synchronized void put(Node node, ClientRequest request) {
-            List<ClientRequest> nodeUnsent = unsent.get(node);
+            ConcurrentLinkedQueue<ClientRequest> nodeUnsent = unsent.get(node);
             if (nodeUnsent == null) {
-                nodeUnsent = new ArrayList<>();
+                nodeUnsent = new ConcurrentLinkedQueue<>();
                 unsent.put(node, nodeUnsent);
             }
             nodeUnsent.add(request);
         }
 
-        public synchronized int getRequestCount(Node node) {
-            List<ClientRequest> requests = unsent.get(node);
+        public synchronized int requestCount(Node node) {
+            ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
             return requests == null ? 0 : requests.size();
         }
 
-        public synchronized int getRequestCount() {
+        public synchronized int requestCount() {
             int total = 0;
-            for (List<ClientRequest> requests : unsent.values())
+            for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values())
                 total += requests.size();
             return total;
         }
 
         public synchronized boolean hasRequest(Node node) {
-            List<ClientRequest> requests = unsent.get(node);
+            ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
             return requests != null && !requests.isEmpty();
         }
 
         public synchronized boolean hasRequest() {
-            for (List<ClientRequest> requests : unsent.values())
+            for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values())
                 if (!requests.isEmpty())
                     return true;
             return false;
@@ -591,10 +571,10 @@ public class ConsumerNetworkClient implements Closeable {
 
         public synchronized List<ClientRequest> removeExpiredRequests(long now, long
unsentExpiryMs) {
             List<ClientRequest> expiredRequests = new ArrayList<>();
-            Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
+            Iterator<ConcurrentLinkedQueue<ClientRequest>> iterator = unsent.values().iterator();
             while (iterator.hasNext()) {
-                Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
-                Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
+                ConcurrentLinkedQueue<ClientRequest> requests = iterator.next();
+                Iterator<ClientRequest> requestIterator = requests.iterator();
                 while (requestIterator.hasNext()) {
                     ClientRequest request = requestIterator.next();
                     if (request.createdTimeMs() < now - unsentExpiryMs) {
@@ -603,19 +583,24 @@ public class ConsumerNetworkClient implements Closeable {
                     } else
                         break;
                 }
-                if (requestEntry.getValue().isEmpty())
+                if (requests.isEmpty())
                     iterator.remove();
             }
             return expiredRequests;
         }
 
-        public synchronized List<ClientRequest> remove(Node node) {
-            List<ClientRequest> requests = unsent.remove(node);
+        public synchronized Collection<ClientRequest> remove(Node node) {
+            ConcurrentLinkedQueue<ClientRequest> requests = unsent.remove(node);
             return requests == null ? Collections.<ClientRequest>emptyList() : requests;
         }
 
-        public synchronized Set<Node> getNodes() {
-            return new HashSet<>(unsent.keySet());
+        public synchronized Iterator<ClientRequest> requestIterator(Node node) {
+            ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
+            return requests == null ? Collections.<ClientRequest>emptyIterator() :
requests.iterator();
+        }
+
+        public synchronized List<Node> nodes() {
+            return new ArrayList<>(unsent.keySet());
         }
     }
 


Mime
View raw message