kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync (#4625)
Date Wed, 06 Jun 2018 21:19:59 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 3d4353e  KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync (#4625)
3d4353e is described below

commit 3d4353e49b777ac508783f2167e241e5912ef55e
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Mar 1 11:04:11 2018 -0800

    KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync (#4625)
    
    Contention for the lock in ConsumerNetworkClient can lead to a livelock situation in which
an active commitSync is unable to make progress because its completion is blocked in the heartbeat
thread. The fix is twofold:
    
    1) We change ConsumerNetworkClient to use a fair lock to reduce the chance of each thread
getting starved.
    2) We eliminate the dependence on the lock in ConsumerNetworkClient for callback completion
so that callbacks will not be blocked by an active poll().
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java    | 48 ++++++-----
 .../consumer/internals/ConsumerCoordinator.java    |  8 +-
 .../consumer/internals/ConsumerNetworkClient.java  | 99 +++++++++++++++++-----
 .../java/org/apache/kafka/clients/MockClient.java  | 38 ++++++++-
 .../internals/ConsumerCoordinatorTest.java         |  9 +-
 .../internals/ConsumerNetworkClientTest.java       | 66 ++++++++++++++-
 6 files changed, 212 insertions(+), 56 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index cd5c382..5850e89 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -231,7 +231,7 @@ public abstract class AbstractCoordinator implements Closeable {
             } else if (coordinator != null && client.connectionFailed(coordinator))
{
                 // we found the coordinator, but the connection has failed, so mark
                 // it dead and backoff before retrying discovery
-                coordinatorDead();
+                markCoordinatorUnknown();
                 time.sleep(retryBackoffMs);
             }
 
@@ -487,7 +487,7 @@ public abstract class AbstractCoordinator implements Closeable {
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR) {
                 // re-discover the coordinator and retry with backoff
-                coordinatorDead();
+                markCoordinatorUnknown();
                 log.debug("Attempt to join group failed due to obsolete coordinator information:
{}", error.message());
                 future.raise(error);
             } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
@@ -550,7 +550,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(new GroupAuthorizationException(groupId));
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                    log.debug("SyncGroup failed due to group rebalance");
+                    log.debug("SyncGroup failed because the group began another rebalance");
                     future.raise(error);
                 } else if (error == Errors.UNKNOWN_MEMBER_ID
                         || error == Errors.ILLEGAL_GENERATION) {
@@ -559,8 +559,8 @@ public abstract class AbstractCoordinator implements Closeable {
                     future.raise(error);
                 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR) {
-                    log.debug("SyncGroup failed:", error.message());
-                    coordinatorDead();
+                    log.debug("SyncGroup failed: {}", error.message());
+                    markCoordinatorUnknown();
                     future.raise(error);
                 } else {
                     future.raise(new KafkaException("Unexpected error from SyncGroup: " +
error.message()));
@@ -627,27 +627,34 @@ public abstract class AbstractCoordinator implements Closeable {
      * @return true if the coordinator is unknown
      */
     public boolean coordinatorUnknown() {
-        return coordinator() == null;
+        return checkAndGetCoordinator() == null;
     }
 
     /**
-     * Get the current coordinator
+     * Get the coordinator if its connection is still active. Otherwise mark it unknown and
+     * return null.
+     *
      * @return the current coordinator or null if it is unknown
      */
-    protected synchronized Node coordinator() {
+    protected synchronized Node checkAndGetCoordinator() {
         if (coordinator != null && client.connectionFailed(coordinator)) {
-            coordinatorDead();
+            markCoordinatorUnknown(true);
             return null;
         }
         return this.coordinator;
     }
 
-    /**
-     * Mark the current coordinator as dead.
-     */
-    protected synchronized void coordinatorDead() {
+    private synchronized Node coordinator() {
+        return this.coordinator;
+    }
+
+    protected synchronized void markCoordinatorUnknown() {
+        markCoordinatorUnknown(false);
+    }
+
+    protected synchronized void markCoordinatorUnknown(boolean isDisconnected) {
         if (this.coordinator != null) {
-            log.info("Marking the coordinator {} dead", this.coordinator);
+            log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery",
this.coordinator);
             Node oldCoordinator = this.coordinator;
 
             // Mark the coordinator dead before disconnecting requests since the callbacks
for any pending
@@ -656,8 +663,9 @@ public abstract class AbstractCoordinator implements Closeable {
             this.coordinator = null;
 
             // Disconnect from the coordinator to ensure that there are no in-flight requests
remaining.
-            // Pending callbacks will be invoked with a DisconnectException.
-            client.disconnect(oldCoordinator);
+            // Pending callbacks will be invoked with a DisconnectException on the next call
to poll.
+            if (!isDisconnected)
+                client.disconnectAsync(oldCoordinator);
         }
     }
 
@@ -708,7 +716,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 // interrupted using wakeup) and the leave group request which have been
queued, but not
                 // yet sent to the broker. Wait up to close timeout for these pending requests
to be processed.
                 // If coordinator is not known, requests are aborted.
-                Node coordinator = coordinator();
+                Node coordinator = checkAndGetCoordinator();
                 if (coordinator != null && !client.awaitPendingRequests(coordinator,
timeoutMs))
                     log.warn("Close timed out with {} pending requests to coordinator, terminating
client connections",
                             client.pendingRequestCount(coordinator));
@@ -769,7 +777,7 @@ public abstract class AbstractCoordinator implements Closeable {
                     || error == Errors.NOT_COORDINATOR) {
                 log.debug("Attempt to heartbeat since coordinator {} is either not started
or not valid.",
                         coordinator());
-                coordinatorDead();
+                markCoordinatorUnknown();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                 log.debug("Attempt to heartbeat failed since group is rebalancing");
@@ -800,7 +808,7 @@ public abstract class AbstractCoordinator implements Closeable {
         public void onFailure(RuntimeException e, RequestFuture<T> future) {
             // mark the coordinator as dead
             if (e instanceof DisconnectException) {
-                coordinatorDead();
+                markCoordinatorUnknown(true);
             }
             future.raise(e);
         }
@@ -948,7 +956,7 @@ public abstract class AbstractCoordinator implements Closeable {
                         } else if (heartbeat.sessionTimeoutExpired(now)) {
                             // the session timeout has expired without seeing a successful
heartbeat, so we should
                             // probably make sure the coordinator is still healthy.
-                            coordinatorDead();
+                            markCoordinatorUnknown();
                         } else if (heartbeat.pollTimeoutExpired(now)) {
                             // the poll timeout has expired, which means that the foreground
thread has stalled
                             // in between calls to poll(), so we explicitly leave the group.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index eebc5d5..293b00e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -695,7 +695,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (offsets.isEmpty())
             return RequestFuture.voidSuccess();
 
-        Node coordinator = coordinator();
+        Node coordinator = checkAndGetCoordinator();
         if (coordinator == null)
             return RequestFuture.coordinatorNotAvailable();
 
@@ -776,7 +776,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                             || error == Errors.NOT_COORDINATOR
                             || error == Errors.REQUEST_TIMED_OUT) {
-                        coordinatorDead();
+                        markCoordinatorUnknown();
                         future.raise(error);
                         return;
                     } else if (error == Errors.UNKNOWN_MEMBER_ID
@@ -813,7 +813,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * @return A request future containing the committed offsets.
      */
     private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition>
partitions) {
-        Node coordinator = coordinator();
+        Node coordinator = checkAndGetCoordinator();
         if (coordinator == null)
             return RequestFuture.coordinatorNotAvailable();
 
@@ -839,7 +839,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     future.raise(error);
                 } else if (error == Errors.NOT_COORDINATOR) {
                     // re-discover the coordinator and retry
-                    coordinatorDead();
+                    markCoordinatorUnknown();
                     future.raise(error);
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(new GroupAuthorizationException(groupId));
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 9d621b9..7d72fcc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Higher level consumer access to the network layer with basic support for request futures.
This class
@@ -64,10 +65,15 @@ public class ConsumerNetworkClient implements Closeable {
     private final long unsentExpiryMs;
     private final AtomicBoolean wakeupDisabled = new AtomicBoolean();
 
+    // We do not need high throughput, so use a fair lock to try to avoid starvation
+    private final ReentrantLock lock = new ReentrantLock(true);
+
     // when requests complete, they are transferred to this queue prior to invocation. The
purpose
     // is to avoid invoking them while holding this object's monitor which can open the door
for deadlocks.
     private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion
= new ConcurrentLinkedQueue<>();
 
+    private final ConcurrentLinkedQueue<Node> pendingDisconnects = new ConcurrentLinkedQueue<>();
+
     // this flag allows the client to be safely woken up without waiting on the lock above.
It is
     // atomic to avoid the need to acquire the lock above in order to enable it concurrently.
     private final AtomicBoolean wakeup = new AtomicBoolean(false);
@@ -110,12 +116,22 @@ public class ConsumerNetworkClient implements Closeable {
         return completionHandler.future;
     }
 
-    public synchronized Node leastLoadedNode() {
-        return client.leastLoadedNode(time.milliseconds());
+    public Node leastLoadedNode() {
+        lock.lock();
+        try {
+            return client.leastLoadedNode(time.milliseconds());
+        } finally {
+            lock.unlock();
+        }
     }
 
-    public synchronized boolean hasReadyNodes() {
-        return client.hasReadyNodes();
+    public boolean hasReadyNodes() {
+        lock.lock();
+        try {
+            return client.hasReadyNodes();
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
@@ -224,14 +240,18 @@ public class ConsumerNetworkClient implements Closeable {
         // there may be handlers which need to be invoked if we woke up the previous call
to poll
         firePendingCompletedRequests();
 
-        synchronized (this) {
+        lock.lock();
+        try {
+            // Handle async disconnects prior to attempting any sends
+            handlePendingDisconnects();
+
             // send all the requests we can send now
             trySend(now);
 
             // check whether the poll is still needed by the caller. Note that if the expected
completion
             // condition becomes satisfied after the call to shouldBlock() (because of a
fired completion
             // handler), the client will be woken up.
-            if (pollCondition == null || pollCondition.shouldBlock()) {
+            if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock()))
{
                 // if there are no requests in flight, do not block longer than the retry
backoff
                 if (client.inFlightRequestCount() == 0)
                     timeout = Math.min(timeout, retryBackoffMs);
@@ -262,6 +282,8 @@ public class ConsumerNetworkClient implements Closeable {
 
             // clean unsent requests collection to keep the map from growing indefinitely
             unsent.clean();
+        } finally {
+            lock.unlock();
         }
 
         // called without the lock to avoid deadlock potential if handlers need to acquire
locks
@@ -300,8 +322,11 @@ public class ConsumerNetworkClient implements Closeable {
      * @return The number of pending requests
      */
     public int pendingRequestCount(Node node) {
-        synchronized (this) {
+        lock.lock();
+        try {
             return unsent.requestCount(node) + client.inFlightRequestCount(node.idString());
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -314,8 +339,11 @@ public class ConsumerNetworkClient implements Closeable {
     public boolean hasPendingRequests(Node node) {
         if (unsent.hasRequests(node))
             return true;
-        synchronized (this) {
+        lock.lock();
+        try {
             return client.hasInFlightRequests(node.idString());
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -325,8 +353,11 @@ public class ConsumerNetworkClient implements Closeable {
      * @return The total count of pending requests
      */
     public int pendingRequestCount() {
-        synchronized (this) {
+        lock.lock();
+        try {
             return unsent.requestCount() + client.inFlightRequestCount();
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -338,8 +369,11 @@ public class ConsumerNetworkClient implements Closeable {
     public boolean hasPendingRequests() {
         if (unsent.hasRequests())
             return true;
-        synchronized (this) {
+        lock.lock();
+        try {
             return client.hasInFlightRequests();
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -383,15 +417,25 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
-    public void disconnect(Node node) {
-        failUnsentRequests(node, DisconnectException.INSTANCE);
+    private void handlePendingDisconnects() {
+        lock.lock();
+        try {
+            while (true) {
+                Node node = pendingDisconnects.poll();
+                if (node == null)
+                    break;
 
-        synchronized (this) {
-            client.disconnect(node.idString());
+                failUnsentRequests(node, DisconnectException.INSTANCE);
+                client.disconnect(node.idString());
+            }
+        } finally {
+            lock.unlock();
         }
+    }
 
-        // We need to poll to ensure callbacks from in-flight requests on the disconnected
socket are fired
-        pollNoWakeup();
+    public void disconnectAsync(Node node) {
+        pendingDisconnects.offer(node);
+        client.wakeup();
     }
 
     private void failExpiredRequests(long now) {
@@ -405,16 +449,16 @@ public class ConsumerNetworkClient implements Closeable {
 
     private void failUnsentRequests(Node node, RuntimeException e) {
         // clear unsent requests to node and fail their corresponding futures
-        synchronized (this) {
+        lock.lock();
+        try {
             Collection<ClientRequest> unsentRequests = unsent.remove(node);
             for (ClientRequest unsentRequest : unsentRequests) {
                 RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)
unsentRequest.callback();
                 handler.onFailure(e);
             }
+        } finally {
+            lock.unlock();
         }
-
-        // called without the lock to avoid deadlock potential
-        firePendingCompletedRequests();
     }
 
     private boolean trySend(long now) {
@@ -455,8 +499,11 @@ public class ConsumerNetworkClient implements Closeable {
 
     @Override
     public void close() throws IOException {
-        synchronized (this) {
+        lock.lock();
+        try {
             client.close();
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -466,8 +513,11 @@ public class ConsumerNetworkClient implements Closeable {
      * @param node Node to connect to if possible
      */
     public boolean connectionFailed(Node node) {
-        synchronized (this) {
+        lock.lock();
+        try {
             return client.connectionFailed(node);
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -478,8 +528,11 @@ public class ConsumerNetworkClient implements Closeable {
      * @param node The node to connect to
      */
     public void tryConnect(Node node) {
-        synchronized (this) {
+        lock.lock();
+        try {
             client.ready(node, time.milliseconds());
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8b33472..1548567 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
@@ -84,6 +85,7 @@ public class MockClient implements KafkaClient {
     private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
     private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
     private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+    private volatile int numBlockingWakeups = 0;
 
     public MockClient(Time time) {
         this(time, null);
@@ -187,8 +189,40 @@ public class MockClient implements KafkaClient {
         this.requests.add(request);
     }
 
+    /**
+     * Simulate a blocking poll in order to test wakeup behavior.
+     *
+     * @param numBlockingWakeups The number of polls which will block until woken up
+     */
+    public synchronized void enableBlockingUntilWakeup(int numBlockingWakeups) {
+        this.numBlockingWakeups = numBlockingWakeups;
+    }
+
+    @Override
+    public synchronized void wakeup() {
+        if (numBlockingWakeups > 0) {
+            numBlockingWakeups--;
+            notify();
+        }
+    }
+
+    private synchronized void maybeAwaitWakeup() {
+        try {
+            int remainingBlockingWakeups = numBlockingWakeups;
+            if (remainingBlockingWakeups <= 0)
+                return;
+
+            while (numBlockingWakeups == remainingBlockingWakeups)
+                wait();
+        } catch (InterruptedException e) {
+            throw new InterruptException(e);
+        }
+    }
+
     @Override
     public List<ClientResponse> poll(long timeoutMs, long now) {
+        maybeAwaitWakeup();
+
         List<ClientResponse> copy = new ArrayList<>(this.responses);
 
         if (metadata != null && metadata.updateRequested()) {
@@ -399,10 +433,6 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
-    public void wakeup() {
-    }
-
-    @Override
     public void close() {
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 2439e82..3b1b196 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -220,7 +220,8 @@ public class ConsumerCoordinatorTest {
             });
         }
 
-        coordinator.coordinatorDead();
+        coordinator.markCoordinatorUnknown();
+        consumerClient.pollNoWakeup();
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(numRequests, responses.get());
     }
@@ -237,7 +238,7 @@ public class ConsumerCoordinatorTest {
         final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
         Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = Collections.singletonMap(
                 new TopicPartition("foo", 0), new OffsetCommitRequest.PartitionData(13L,
""));
-        consumerClient.send(coordinator.coordinator(), new OffsetCommitRequest.Builder(groupId,
offsets))
+        consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(groupId,
offsets))
                 .compose(new RequestFutureAdapter<ClientResponse, Object>() {
                     @Override
                     public void onSuccess(ClientResponse value, RequestFuture<Object>
future) {}
@@ -250,7 +251,8 @@ public class ConsumerCoordinatorTest {
                     }
                 });
 
-        coordinator.coordinatorDead();
+        coordinator.markCoordinatorUnknown();
+        consumerClient.pollNoWakeup();
         assertTrue(asyncCallbackInvoked.get());
     }
 
@@ -1034,6 +1036,7 @@ public class ConsumerCoordinatorTest {
 
         client.respond(offsetCommitResponse(Collections.singletonMap(t1p, error)));
         consumerClient.pollNoWakeup();
+        consumerClient.pollNoWakeup(); // second poll since coordinator disconnect is async
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 3ed8e3d..32110c3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -88,7 +88,8 @@ public class ConsumerNetworkClientTest {
         RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
         assertTrue(consumerClient.hasPendingRequests(node));
         assertFalse(client.hasInFlightRequests(node.idString()));
-        consumerClient.disconnect(node);
+        consumerClient.disconnectAsync(node);
+        consumerClient.pollNoWakeup();
         assertTrue(future.failed());
         assertTrue(future.exception() instanceof DisconnectException);
     }
@@ -99,7 +100,8 @@ public class ConsumerNetworkClientTest {
         consumerClient.pollNoWakeup();
         assertTrue(consumerClient.hasPendingRequests(node));
         assertTrue(client.hasInFlightRequests(node.idString()));
-        consumerClient.disconnect(node);
+        consumerClient.disconnectAsync(node);
+        consumerClient.pollNoWakeup();
         assertTrue(future.failed());
         assertTrue(future.exception() instanceof DisconnectException);
     }
@@ -187,6 +189,66 @@ public class ConsumerNetworkClientTest {
     }
 
     @Test
+    public void testDisconnectWakesUpPoll() throws Exception {
+        final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
+
+        client.enableBlockingUntilWakeup(1);
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                consumerClient.poll(future);
+            }
+        };
+        t.start();
+
+        consumerClient.disconnectAsync(node);
+        t.join();
+        assertTrue(future.failed());
+        assertTrue(future.exception() instanceof DisconnectException);
+    }
+
+    @Test
+    public void testFutureCompletionOutsidePoll() throws Exception {
+        // Tests the scenario in which the request that is being awaited in one thread
+        // is received and completed in another thread.
+
+        final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
+        consumerClient.pollNoWakeup(); // dequeue and send the request
+
+        client.enableBlockingUntilWakeup(2);
+        Thread t1 = new Thread() {
+            @Override
+            public void run() {
+                consumerClient.pollNoWakeup();
+            }
+        };
+        t1.start();
+
+        // Sleep a little so that t1 is blocking in poll
+        Thread.sleep(50);
+
+        Thread t2 = new Thread() {
+            @Override
+            public void run() {
+                consumerClient.poll(future);
+            }
+        };
+        t2.start();
+
+        // Sleep a little so that t2 is awaiting the network client lock
+        Thread.sleep(50);
+
+        // Simulate a network response and return from the poll in t1
+        client.respond(heartbeatResponse(Errors.NONE));
+        client.wakeup();
+
+        // Both threads should complete since t1 should wakeup t2
+        t1.join();
+        t2.join();
+        assertTrue(future.succeeded());
+    }
+
+    @Test
     public void testAwaitForMetadataUpdateWithTimeout() {
         assertFalse(consumerClient.awaitMetadataUpdate(10L));
     }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message