kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4820; ConsumerNetworkClient.send() should not require global lock
Date Sat, 04 Mar 2017 19:39:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5781feb52 -> adb70da13


KAFKA-4820; ConsumerNetworkClient.send() should not require global lock

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>,
Jiangjie Qin <becket.qin@gmail.com>

Closes #2619 from lindong28/KAFKA-4820


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/adb70da1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/adb70da1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/adb70da1

Branch: refs/heads/trunk
Commit: adb70da13e18eb652e734887b430ac0ecbc5f9e6
Parents: 5781feb
Author: Dong Lin <lindong28@gmail.com>
Authored: Sat Mar 4 11:38:57 2017 -0800
Committer: Jiangjie Qin <becket.qin@gmail.com>
Committed: Sat Mar 4 11:38:57 2017 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |   2 +-
 .../internals/ConsumerNetworkClient.java        | 210 +++++++++++++------
 .../clients/consumer/internals/Fetcher.java     |   2 +-
 3 files changed, 153 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/adb70da1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b4514c5..51b00af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1004,7 +1004,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     //
                     // NOTE: since the consumed position has already been updated, we must
not allow
                     // wakeups or any other errors to be triggered prior to returning the
fetched records.
-                    if (fetcher.sendFetches() > 0 || client.pendingRequestCount() >
0)
+                    if (fetcher.sendFetches() > 0 || client.hasPendingRequest())
                         client.pollNoWakeup();
 
                     if (this.interceptors == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/adb70da1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 8781676..2fa7667 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -35,10 +35,13 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.kafka.common.errors.InterruptException;
@@ -55,7 +58,7 @@ public class ConsumerNetworkClient implements Closeable {
     // the mutable state of this class is protected by the object's monitor (excluding the
wakeup
     // flag and the request completion queue below).
     private final KafkaClient client;
-    private final Map<Node, List<ClientRequest>> unsent = new HashMap<>();
+    private final UnsentRequests unsent = new UnsentRequests();
     private final Metadata metadata;
     private final Time time;
     private final long retryBackoffMs;
@@ -99,24 +102,13 @@ public class ConsumerNetworkClient implements Closeable {
         RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
         ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder,
now, true,
                 completionHandler);
-        put(node, clientRequest);
+        unsent.put(node, clientRequest);
 
         // wakeup the client in case it is blocking in poll so that we can send the queued
request
         client.wakeup();
         return completionHandler.future;
     }
 
-    private void put(Node node, ClientRequest request) {
-        synchronized (this) {
-            List<ClientRequest> nodeUnsent = unsent.get(node);
-            if (nodeUnsent == null) {
-                nodeUnsent = new ArrayList<>();
-                unsent.put(node, nodeUnsent);
-            }
-            nodeUnsent.add(request);
-        }
-    }
-
     public Node leastLoadedNode() {
         synchronized (this) {
             return client.leastLoadedNode(time.milliseconds());
@@ -280,12 +272,12 @@ public class ConsumerNetworkClient implements Closeable {
         long startMs = time.milliseconds();
         long remainingMs = timeoutMs;
 
-        while (pendingRequestCount(node) > 0 && remainingMs > 0) {
+        while (hasPendingRequest(node) && remainingMs > 0) {
             poll(remainingMs);
             remainingMs = timeoutMs - (time.milliseconds() - startMs);
         }
 
-        return pendingRequestCount(node) == 0;
+        return !hasPendingRequest(node);
     }
 
     /**
@@ -296,9 +288,21 @@ public class ConsumerNetworkClient implements Closeable {
      */
     public int pendingRequestCount(Node node) {
         synchronized (this) {
-            List<ClientRequest> pending = unsent.get(node);
-            int unsentCount = pending == null ? 0 : pending.size();
-            return unsentCount + client.inFlightRequestCount(node.idString());
+            return unsent.getRequestCount(node) + client.inFlightRequestCount(node.idString());
+        }
+    }
+
+    /**
+     * Check whether there is pending request to the given node. This includes both request
that
+     * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+     * @param node The node in question
+     * @return A boolean indicating whether there is pending request
+     */
+    public boolean hasPendingRequest(Node node) {
+        if (unsent.hasRequest(node))
+            return true;
+        synchronized (this) {
+            return client.inFlightRequestCount(node.idString()) > 0;
         }
     }
 
@@ -309,10 +313,20 @@ public class ConsumerNetworkClient implements Closeable {
      */
     public int pendingRequestCount() {
         synchronized (this) {
-            int total = 0;
-            for (List<ClientRequest> requests: unsent.values())
-                total += requests.size();
-            return total + client.inFlightRequestCount();
+            return unsent.getRequestCount() + client.inFlightRequestCount();
+        }
+    }
+
+    /**
+     * Check whether there is pending request. This includes both requests that
+     * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+     * @return A boolean indicating whether there is pending request
+     */
+    public boolean hasPendingRequest() {
+        if (unsent.hasRequest())
+            return true;
+        synchronized (this) {
+            return client.inFlightRequestCount() > 0;
         }
     }
 
@@ -337,19 +351,17 @@ public class ConsumerNetworkClient implements Closeable {
         // by NetworkClient, so we just need to check whether connections for any of the
unsent
         // requests have been disconnected; if they have, then we complete the corresponding
future
         // and set the disconnect flag in the ClientResponse
-        Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
-            Node node = requestEntry.getKey();
+        Set<Node> nodes = unsent.getNodes();
+        for (Node node: nodes) {
             if (client.connectionFailed(node)) {
                 // Remove entry before invoking request callback to avoid callbacks handling
                 // coordinator failures traversing the unsent list again.
-                iterator.remove();
-                for (ClientRequest request : requestEntry.getValue()) {
+                List<ClientRequest> requests = unsent.remove(node);
+                for (ClientRequest request : requests) {
                     RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)
request.callback();
                     handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()),
-                            request.callback(), request.destination(), request.createdTimeMs(),
now, true,
-                            null, null));
+                        request.callback(), request.destination(), request.createdTimeMs(),
now, true,
+                        null, null));
                 }
             }
         }
@@ -357,21 +369,10 @@ public class ConsumerNetworkClient implements Closeable {
 
     private void failExpiredRequests(long now) {
         // clear all expired unsent requests and fail their corresponding futures
-        Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
-            Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
-            while (requestIterator.hasNext()) {
-                ClientRequest request = requestIterator.next();
-                if (request.createdTimeMs() < now - unsentExpiryMs) {
-                    RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)
request.callback();
-                    handler.onFailure(new TimeoutException("Failed to send request after
" + unsentExpiryMs + " ms."));
-                    requestIterator.remove();
-                } else
-                    break;
-            }
-            if (requestEntry.getValue().isEmpty())
-                iterator.remove();
+        List<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs);
+        for (ClientRequest request: expiredRequests) {
+            RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+            handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs
+ " ms."));
         }
     }
 
@@ -379,11 +380,9 @@ public class ConsumerNetworkClient implements Closeable {
         // clear unsent requests to node and fail their corresponding futures
         synchronized (this) {
             List<ClientRequest> unsentRequests = unsent.remove(node);
-            if (unsentRequests != null) {
-                for (ClientRequest unsentRequest : unsentRequests) {
-                    RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)
unsentRequest.callback();
-                    handler.onFailure(e);
-                }
+            for (ClientRequest unsentRequest : unsentRequests) {
+                RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)
unsentRequest.callback();
+                handler.onFailure(e);
             }
         }
 
@@ -394,15 +393,25 @@ public class ConsumerNetworkClient implements Closeable {
     private boolean trySend(long now) {
         // send any requests that can be sent now
         boolean requestsSent = false;
-        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet())
{
-            Node node = requestEntry.getKey();
-            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
-            while (iterator.hasNext()) {
-                ClientRequest request = iterator.next();
-                if (client.ready(node, now)) {
-                    client.send(request, now);
-                    iterator.remove();
-                    requestsSent = true;
+        Set<Node> nodes = unsent.getNodes();
+        for (Node node: nodes) {
+            if (client.ready(node, now)) {
+                // Remove entry before invoking request callback to avoid callbacks handling
+                // coordinator failures traversing the unsent list again.
+                List<ClientRequest> requests = unsent.remove(node);
+                try {
+                    Iterator<ClientRequest> iterator = requests.iterator();
+                    while (iterator.hasNext()) {
+                        ClientRequest request = iterator.next();
+                        if (!client.ready(node, now))
+                            break;
+                        client.send(request, now);
+                        requestsSent = true;
+                        iterator.remove();
+                    }
+                } finally {
+                    if (!requests.isEmpty())
+                        unsent.put(node, requests);
                 }
             }
         }
@@ -527,4 +536,87 @@ public class ConsumerNetworkClient implements Closeable {
         boolean shouldBlock();
     }
 
+
+    /*
+     * A threadsafe helper class to hold requests per node that has not been sent yet
+     */
+    private final static class UnsentRequests {
+        private final Map<Node, List<ClientRequest>> unsent;
+
+        public UnsentRequests() {
+            unsent = new HashMap<>();
+        }
+
+        public synchronized void put(Node node, List<ClientRequest> requests) {
+            List<ClientRequest> nodeUnsent = unsent.get(node);
+            if (nodeUnsent == null) {
+                nodeUnsent = new ArrayList<>();
+                unsent.put(node, nodeUnsent);
+            }
+            nodeUnsent.addAll(requests);
+        }
+
+        public synchronized void put(Node node, ClientRequest request) {
+            List<ClientRequest> nodeUnsent = unsent.get(node);
+            if (nodeUnsent == null) {
+                nodeUnsent = new ArrayList<>();
+                unsent.put(node, nodeUnsent);
+            }
+            nodeUnsent.add(request);
+        }
+
+        public synchronized int getRequestCount(Node node) {
+            List<ClientRequest> requests = unsent.get(node);
+            return requests == null ? 0 : requests.size();
+        }
+
+        public synchronized int getRequestCount() {
+            int total = 0;
+            for (List<ClientRequest> requests : unsent.values())
+                total += requests.size();
+            return total;
+        }
+
+        public synchronized boolean hasRequest(Node node) {
+            List<ClientRequest> requests = unsent.get(node);
+            return requests != null && !requests.isEmpty();
+        }
+
+        public synchronized boolean hasRequest() {
+            for (List<ClientRequest> requests : unsent.values())
+                if (!requests.isEmpty())
+                    return true;
+            return false;
+        }
+
+        public synchronized List<ClientRequest> removeExpiredRequests(long now, long
unsentExpiryMs) {
+            List<ClientRequest> expiredRequests = new ArrayList<>();
+            Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
+                Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
+                while (requestIterator.hasNext()) {
+                    ClientRequest request = requestIterator.next();
+                    if (request.createdTimeMs() < now - unsentExpiryMs) {
+                        expiredRequests.add(request);
+                        requestIterator.remove();
+                    } else
+                        break;
+                }
+                if (requestEntry.getValue().isEmpty())
+                    iterator.remove();
+            }
+            return expiredRequests;
+        }
+
+        public synchronized List<ClientRequest> remove(Node node) {
+            List<ClientRequest> requests = unsent.remove(node);
+            return requests == null ? Collections.<ClientRequest>emptyList() : requests;
+        }
+
+        public synchronized Set<Node> getNodes() {
+            return new HashSet<>(unsent.keySet());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adb70da1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 8a8952c..536e4e8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -713,7 +713,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener
{
             Node node = cluster.leaderFor(partition);
             if (node == null) {
                 metadata.requestUpdate();
-            } else if (this.client.pendingRequestCount(node) == 0) {
+            } else if (!this.client.hasPendingRequest(node)) {
                 // if there is a leader and no in-flight requests, issue a new fetch
                 LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                 if (fetch == null) {


Mime
View raw message