kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3807; Fix transient test failure caused by race on future completion
Date Fri, 09 Sep 2016 21:08:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3759d7f76 -> 4af50bb86


KAFKA-3807; Fix transient test failure caused by race on future completion

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Dan Norwood <norwood@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1821 from hachikuji/KAFKA-3807


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4af50bb8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4af50bb8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4af50bb8

Branch: refs/heads/trunk
Commit: 4af50bb8600c37ee2e3597fba9a54a29cef94afa
Parents: 3759d7f
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Sep 9 21:44:55 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Sep 9 21:47:03 2016 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  24 ++-
 .../internals/ConsumerNetworkClient.java        |  51 ++++-
 .../clients/consumer/internals/Fetcher.java     |  24 +++
 .../consumer/internals/RequestFuture.java       |  93 ++++++----
 .../internals/ConsumerNetworkClientTest.java    |  42 +++++
 .../clients/consumer/internals/FetcherTest.java |  30 ++-
 .../consumer/internals/RequestFutureTest.java   | 184 ++++++++++++++++++-
 7 files changed, 395 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4af50bb8/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ca8f1f1..ade4243 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -18,6 +18,7 @@ import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.PollCondition;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -989,9 +990,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     }
 
     /**
-     * Do one round of polling. In addition to checking for new data, this does any needed
-     * heart-beating, auto-commits, and offset updates.
-     * @param timeout The maximum time to block in the underlying poll
+     * Do one round of polling. In addition to checking for new data, this does any needed
offset commits
+     * (if auto-commit is enabled), and offset resets (if an offset reset policy is defined).
+     * @param timeout The maximum time to block in the underlying call to {@link ConsumerNetworkClient#poll(long)}.
      * @return The fetched records (may be empty)
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long
timeout) {
@@ -1010,8 +1011,23 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         // send any new fetches (won't resend pending fetches)
         fetcher.sendFetches();
 
+        // if no fetches could be sent at the moment (which can happen if a partition leader
is in the
+        // blackout period following a disconnect, or if the partition leader is unknown),
then we don't
+        // block for longer than the retry backoff duration.
+        if (!fetcher.hasInFlightFetches())
+            timeout = Math.min(timeout, retryBackoffMs);
+
         long now = time.milliseconds();
-        client.poll(Math.min(coordinator.timeToNextPoll(now), timeout), now);
+        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
+
+        client.poll(pollTimeout, now, new PollCondition() {
+            @Override
+            public boolean shouldBlock() {
+                // since a fetch might be completed by the background thread, we need this
poll condition
+                // to ensure that we do not block unnecessarily in poll()
+                return !fetcher.hasCompletedFetches() && fetcher.hasInFlightFetches();
+            }
+        });
 
         // after the long poll, we should check whether the group needs to rebalance
         // prior to returning data so that the group can stabilize faster

http://git-wip-us.apache.org/repos/asf/kafka/blob/4af50bb8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 07edd3c..ef78c05 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -59,7 +59,7 @@ public class ConsumerNetworkClient implements Closeable {
     private int wakeupDisabledCount = 0;
 
     // when requests complete, they are transferred to this queue prior to invocation. The
purpose
-    // is to avoid invoking them while holding the lock above.
+    // is to avoid invoking them while holding this object's monitor which can open the door
for deadlocks.
     private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion
= new ConcurrentLinkedQueue<>();
 
     // this flag allows the client to be safely woken up without waiting on the lock above.
It is
@@ -105,6 +105,9 @@ public class ConsumerNetworkClient implements Closeable {
         RequestHeader header = client.nextRequestHeader(api, version);
         RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
         put(node, new ClientRequest(now, true, send, completionHandler));
+
+        // wakeup the client in case it is blocking in poll so that we can send the queued
request
+        client.wakeup();
         return completionHandler.future;
     }
 
@@ -162,7 +165,7 @@ public class ConsumerNetworkClient implements Closeable {
      */
     public void poll(RequestFuture<?> future) {
         while (!future.isDone())
-            poll(Long.MAX_VALUE);
+            poll(Long.MAX_VALUE, time.milliseconds(), future);
     }
 
     /**
@@ -177,7 +180,7 @@ public class ConsumerNetworkClient implements Closeable {
         long remaining = timeout;
         long now = begin;
         do {
-            poll(remaining, now);
+            poll(remaining, now, future);
             now = time.milliseconds();
             long elapsed = now - begin;
             remaining = timeout - elapsed;
@@ -191,7 +194,7 @@ public class ConsumerNetworkClient implements Closeable {
      * @throws WakeupException if {@link #wakeup()} is called from another thread
      */
     public void poll(long timeout) {
-        poll(timeout, time.milliseconds());
+        poll(timeout, time.milliseconds(), null);
     }
 
     /**
@@ -199,7 +202,7 @@ public class ConsumerNetworkClient implements Closeable {
      * @param timeout timeout in milliseconds
      * @param now current time in milliseconds
      */
-    public void poll(long timeout, long now) {
+    public void poll(long timeout, long now, PollCondition pollCondition) {
         // there may be handlers which need to be invoked if we woke up the previous call
to poll
         firePendingCompletedRequests();
 
@@ -207,10 +210,15 @@ public class ConsumerNetworkClient implements Closeable {
             // send all the requests we can send now
             trySend(now);
 
-            // ensure we don't poll any longer than the deadline for
-            // the next scheduled task
-            client.poll(timeout, now);
-            now = time.milliseconds();
+            // check whether the poll is still needed by the caller. Note that if the expected
completion
+            // condition becomes satisfied after the call to shouldBlock() (because of a
fired completion
+            // handler), the client will be woken up.
+            if (pollCondition == null || pollCondition.shouldBlock()) {
+                client.poll(timeout, now);
+                now = time.milliseconds();
+            } else {
+                client.poll(0, now);
+            }
 
             // handle any disconnects by failing the active requests. note that disconnects
must
             // be checked immediately following poll since any subsequent call to client.ready()
@@ -240,7 +248,7 @@ public class ConsumerNetworkClient implements Closeable {
     public void pollNoWakeup() {
         disableWakeups();
         try {
-            poll(0, time.milliseconds());
+            poll(0, time.milliseconds(), null);
         } finally {
             enableWakeups();
         }
@@ -284,13 +292,19 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     private void firePendingCompletedRequests() {
+        boolean completedRequestsFired = false;
         for (;;) {
             RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
             if (completionHandler == null)
                 break;
 
             completionHandler.fireCompletion();
+            completedRequestsFired = true;
         }
+
+        // wakeup the client in case it is blocking in poll for this future's completion
+        if (completedRequestsFired)
+            client.wakeup();
     }
 
     private void checkDisconnects(long now) {
@@ -462,4 +476,21 @@ public class ConsumerNetworkClient implements Closeable {
             pendingCompletion.add(this);
         }
     }
+
+    /**
+     * When invoking poll from a multi-threaded environment, it is possible that the condition
that
+     * the caller is awaiting has already been satisfied prior to the invocation of poll.
We therefore
+     * introduce this interface to push the condition checking as close as possible to the
invocation
+     * of poll. In particular, the check will be done while holding the lock used to protect
concurrent
+     * access to {@link org.apache.kafka.clients.NetworkClient}, which means implementations
must be
+     * very careful about locking order if the callback must acquire additional locks.
+     */
+    public interface PollCondition {
+        /**
+         * Return whether the caller is still awaiting an IO event.
+         * @return true if so, false otherwise.
+         */
+        boolean shouldBlock();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4af50bb8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index eb876a5..23a8511 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -67,6 +67,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * This class manage the fetching process with the brokers.
@@ -87,6 +88,7 @@ public class Fetcher<K, V> {
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
+    private final AtomicInteger numInFlightFetches = new AtomicInteger(0);
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
 
@@ -123,6 +125,23 @@ public class Fetcher<K, V> {
     }
 
     /**
+     * Return whether we have any completed fetches pending return to the user. This method
is thread-safe.
+     * @return true if there are completed fetches, false otherwise
+     */
+    public boolean hasCompletedFetches() {
+        return !completedFetches.isEmpty();
+    }
+
+    /**
+     * Check whether there are in-flight fetches. This is used to avoid unnecessary blocking
in
+     * {@link ConsumerNetworkClient#poll(long)} if there are no fetches to wait for. This
method is thread-safe.
+     * @return true if there are, false otherwise
+     */
+    public boolean hasInFlightFetches() {
+        return numInFlightFetches.get() > 0;
+    }
+
+    /**
      * Set-up a fetch request for any node that we have assigned partitions for which doesn't
already have
      * an in-flight fetch or pending fetch data.
      */
@@ -130,10 +149,14 @@ public class Fetcher<K, V> {
         for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet())
{
             final FetchRequest request = fetchEntry.getValue();
             final Node fetchTarget = fetchEntry.getKey();
+
+            numInFlightFetches.incrementAndGet();
             client.send(fetchTarget, ApiKeys.FETCH, request)
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
                         public void onSuccess(ClientResponse resp) {
+                            numInFlightFetches.decrementAndGet();
+
                             FetchResponse response = new FetchResponse(resp.responseBody());
                             Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                             FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors,
partitions);
@@ -151,6 +174,7 @@ public class Fetcher<K, V> {
 
                         @Override
                         public void onFailure(RuntimeException e) {
+                            numInFlightFetches.decrementAndGet();
                             log.debug("Fetch request to {} failed", fetchTarget, e);
                         }
                     });

http://git-wip-us.apache.org/repos/asf/kafka/blob/4af50bb8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index b21d13e..3a55ced 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -15,8 +15,8 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.protocol.Errors;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Result of an asynchronous request from {@link ConsumerNetworkClient}. Use {@link ConsumerNetworkClient#poll(long)}
@@ -37,28 +37,30 @@ import java.util.List;
  *
  * @param <T> Return type of the result (Can be Void if there is no response)
  */
-public class RequestFuture<T> {
-
-    private boolean isDone = false;
-    private T value;
-    private RuntimeException exception;
-    private List<RequestFutureListener<T>> listeners = new ArrayList<>();
+public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
 
+    private static final Object INCOMPLETE_SENTINEL = new Object();
+    private final AtomicReference<Object> result = new AtomicReference<>(INCOMPLETE_SENTINEL);
+    private final ConcurrentLinkedQueue<RequestFutureListener<T>> listeners =
new ConcurrentLinkedQueue<>();
 
     /**
      * Check whether the response is ready to be handled
      * @return true if the response is ready, false otherwise
      */
     public boolean isDone() {
-        return isDone;
+        return result.get() != INCOMPLETE_SENTINEL;
     }
 
     /**
      * Get the value corresponding to this request (only available if the request succeeded)
-     * @return the value if it exists or null
+     * @return the value set in {@link #complete(Object)}
+     * @throws IllegalStateException if the future is not complete or failed
      */
+    @SuppressWarnings("unchecked")
     public T value() {
-        return value;
+        if (!succeeded())
+            throw new IllegalStateException("Attempt to retrieve value from future which
hasn't successfully completed");
+        return (T) result.get();
     }
 
     /**
@@ -66,7 +68,7 @@ public class RequestFuture<T> {
      * @return true if the request completed and was successful
      */
     public boolean succeeded() {
-        return isDone && exception == null;
+        return isDone() && !failed();
     }
 
     /**
@@ -74,36 +76,43 @@ public class RequestFuture<T> {
      * @return true if the request completed with a failure
      */
     public boolean failed() {
-        return isDone && exception != null;
+        return result.get() instanceof RuntimeException;
     }
 
     /**
      * Check if the request is retriable (convenience method for checking if
      * the exception is an instance of {@link RetriableException}.
      * @return true if it is retriable, false otherwise
+     * @throws IllegalStateException if the future is not complete or completed successfully
      */
     public boolean isRetriable() {
-        return exception instanceof RetriableException;
+        return exception() instanceof RetriableException;
     }
 
     /**
      * Get the exception from a failed result (only available if the request failed)
-     * @return The exception if it exists or null
+     * @return the exception set in {@link #raise(RuntimeException)}
+     * @throws IllegalStateException if the future is not complete or completed successfully
      */
     public RuntimeException exception() {
-        return exception;
+        if (!failed())
+            throw new IllegalStateException("Attempt to retrieve exception from future which
hasn't failed");
+        return (RuntimeException) result.get();
     }
 
     /**
      * Complete the request successfully. After this call, {@link #succeeded()} will return
true
      * and the value can be obtained through {@link #value()}.
      * @param value corresponding value (or null if there is none)
+     * @throws IllegalStateException if the future has already been completed
+     * @throws IllegalArgumentException if the argument is an instance of {@link RuntimeException}
      */
     public void complete(T value) {
-        if (isDone)
+        if (value instanceof RuntimeException)
+            throw new IllegalArgumentException("The argument to complete can not be an instance
of RuntimeException");
+
+        if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
             throw new IllegalStateException("Invalid attempt to complete a request future
which is already complete");
-        this.value = value;
-        this.isDone = true;
         fireSuccess();
     }
 
@@ -111,12 +120,15 @@ public class RequestFuture<T> {
      * Raise an exception. The request will be marked as failed, and the caller can either
      * handle the exception or throw it.
      * @param e corresponding exception to be passed to caller
+     * @throws IllegalStateException if the future has already been completed
      */
     public void raise(RuntimeException e) {
-        if (isDone)
+        if (e == null)
+            throw new IllegalArgumentException("The exception passed to raise must not be
null");
+
+        if (!result.compareAndSet(INCOMPLETE_SENTINEL, e))
             throw new IllegalStateException("Invalid attempt to complete a request future
which is already complete");
-        this.exception = e;
-        this.isDone = true;
+
         fireFailure();
     }
 
@@ -129,28 +141,35 @@ public class RequestFuture<T> {
     }
 
     private void fireSuccess() {
-        for (RequestFutureListener<T> listener : listeners)
+        T value = value();
+        while (true) {
+            RequestFutureListener<T> listener = listeners.poll();
+            if (listener == null)
+                break;
             listener.onSuccess(value);
+        }
     }
 
     private void fireFailure() {
-        for (RequestFutureListener<T> listener : listeners)
+        RuntimeException exception = exception();
+        while (true) {
+            RequestFutureListener<T> listener = listeners.poll();
+            if (listener == null)
+                break;
             listener.onFailure(exception);
+        }
     }
 
     /**
      * Add a listener which will be notified when the future completes
-     * @param listener
+     * @param listener non-null listener to add
      */
     public void addListener(RequestFutureListener<T> listener) {
-        if (isDone) {
-            if (exception != null)
-                listener.onFailure(exception);
-            else
-                listener.onSuccess(value);
-        } else {
-            this.listeners.add(listener);
-        }
+        this.listeners.add(listener);
+        if (failed())
+            fireFailure();
+        else if (succeeded())
+            fireSuccess();
     }
 
     /**
@@ -160,7 +179,7 @@ public class RequestFuture<T> {
      * @return The new future
      */
     public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S>
adapter) {
-        final RequestFuture<S> adapted = new RequestFuture<S>();
+        final RequestFuture<S> adapted = new RequestFuture<>();
         addListener(new RequestFutureListener<T>() {
             @Override
             public void onSuccess(T value) {
@@ -190,7 +209,7 @@ public class RequestFuture<T> {
     }
 
     public static <T> RequestFuture<T> failure(RuntimeException e) {
-        RequestFuture<T> future = new RequestFuture<T>();
+        RequestFuture<T> future = new RequestFuture<>();
         future.raise(e);
         return future;
     }
@@ -217,4 +236,8 @@ public class RequestFuture<T> {
         return failure(new StaleMetadataException());
     }
 
+    @Override
+    public boolean shouldBlock() {
+        return !isDone();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4af50bb8/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 8dcbde2..368998c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -15,6 +15,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.WakeupException;
@@ -25,8 +26,10 @@ import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
@@ -76,6 +79,45 @@ public class ConsumerNetworkClientTest {
     }
 
     @Test
+    public void doNotBlockIfPollConditionIsSatisfied() {
+        NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
+        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient,
metadata, time, 100, 1000);
+
+        // expect poll, but with no timeout
+        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(0L), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
+
+        EasyMock.replay(mockNetworkClient);
+
+        consumerClient.poll(Long.MAX_VALUE, time.milliseconds(), new ConsumerNetworkClient.PollCondition()
{
+            @Override
+            public boolean shouldBlock() {
+                return false;
+            }
+        });
+
+        EasyMock.verify(mockNetworkClient);
+    }
+
+    @Test
+    public void blockWhenPollConditionNotSatisfied() {
+        NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
+        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient,
metadata, time, 100, 1000);
+
+        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(Long.MAX_VALUE), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
+
+        EasyMock.replay(mockNetworkClient);
+
+        consumerClient.poll(Long.MAX_VALUE, time.milliseconds(), new ConsumerNetworkClient.PollCondition()
{
+            @Override
+            public boolean shouldBlock() {
+                return true;
+            }
+        });
+
+        EasyMock.verify(mockNetworkClient);
+    }
+
+    @Test
     public void wakeup() {
         RequestFuture<ClientResponse> future = consumerClient.send(node, ApiKeys.METADATA,
heartbeatRequest());
         consumerClient.wakeup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4af50bb8/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5c0b49c..90ddcb6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -123,15 +123,23 @@ public class FetcherTest {
 
     @Test
     public void testFetchNormal() {
-        List<ConsumerRecord<byte[], byte[]>> records;
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         // normal fetch
         fetcher.sendFetches();
+        assertTrue(fetcher.hasInFlightFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L,
0));
         consumerClient.poll(0);
-        records = fetcher.fetchedRecords().get(tp);
+        assertTrue(fetcher.hasCompletedFetches());
+        assertFalse(fetcher.hasInFlightFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords
= fetcher.fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp));
+
+        List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp);
         assertEquals(3, records.size());
         assertEquals(4L, subscriptions.position(tp).longValue()); // this is the next fetching
position
         long offset = 1;
@@ -141,6 +149,24 @@ public class FetcherTest {
         }
     }
 
+    @Test
+    public void testFetchError() {
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        fetcher.sendFetches();
+        assertTrue(fetcher.hasInFlightFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(),
100L, 0));
+        consumerClient.poll(0);
+        assertTrue(fetcher.hasCompletedFetches());
+        assertFalse(fetcher.hasInFlightFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords
= fetcher.fetchedRecords();
+        assertFalse(partitionRecords.containsKey(tp));
+    }
+
     private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset)
{
         return new MockClient.RequestMatcher() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/4af50bb8/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
index 7372754..bf3c712 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
@@ -14,14 +14,169 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.junit.Test;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class RequestFutureTest {
 
     @Test
+    public void testBasicCompletion() {
+        RequestFuture<String> future = new RequestFuture<>();
+        String value = "foo";
+        future.complete(value);
+        assertTrue(future.isDone());
+        assertEquals(value, future.value());
+    }
+
+    @Test
+    public void testBasicFailure() {
+        RequestFuture<String> future = new RequestFuture<>();
+        RuntimeException exception = new RuntimeException();
+        future.raise(exception);
+        assertTrue(future.isDone());
+        assertEquals(exception, future.exception());
+    }
+
+    @Test
+    public void testVoidFuture() {
+        RequestFuture<Void> future = new RequestFuture<>();
+        future.complete(null);
+        assertTrue(future.isDone());
+        assertNull(future.value());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testRuntimeExceptionInComplete() {
+        RequestFuture<Exception> future = new RequestFuture<>();
+        future.complete(new RuntimeException());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void invokeCompleteAfterAlreadyComplete() {
+        RequestFuture<Void> future = new RequestFuture<>();
+        future.complete(null);
+        future.complete(null);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void invokeCompleteAfterAlreadyFailed() {
+        RequestFuture<Void> future = new RequestFuture<>();
+        future.raise(new RuntimeException());
+        future.complete(null);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void invokeRaiseAfterAlreadyFailed() {
+        RequestFuture<Void> future = new RequestFuture<>();
+        future.raise(new RuntimeException());
+        future.raise(new RuntimeException());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void invokeRaiseAfterAlreadyCompleted() {
+        RequestFuture<Void> future = new RequestFuture<>();
+        future.complete(null);
+        future.raise(new RuntimeException());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void invokeExceptionAfterSuccess() {
+        RequestFuture<Void> future = new RequestFuture<>();
+        future.complete(null);
+        future.exception();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void invokeValueAfterFailure() {
+        RequestFuture<Void> future = new RequestFuture<>();
+        future.raise(new RuntimeException());
+        future.value();
+    }
+
+    @Test
+    public void listenerInvokedIfAddedBeforeFutureCompletion() {
+        RequestFuture<Void> future = new RequestFuture<>();
+
+        MockRequestFutureListener<Void> listener = new MockRequestFutureListener<>();
+        future.addListener(listener);
+
+        future.complete(null);
+
+        assertOnSuccessInvoked(listener);
+    }
+
+    @Test
+    public void listenerInvokedIfAddedBeforeFutureFailure() {
+        RequestFuture<Void> future = new RequestFuture<>();
+
+        MockRequestFutureListener<Void> listener = new MockRequestFutureListener<>();
+        future.addListener(listener);
+
+        future.raise(new RuntimeException());
+
+        assertOnFailureInvoked(listener);
+    }
+
+    @Test
+    public void listenerInvokedIfAddedAfterFutureCompletion() {
+        RequestFuture<Void> future = new RequestFuture<>();
+        future.complete(null);
+
+        MockRequestFutureListener<Void> listener = new MockRequestFutureListener<>();
+        future.addListener(listener);
+
+        assertOnSuccessInvoked(listener);
+    }
+
+    @Test
+    public void listenerInvokedIfAddedAfterFutureFailure() {
+        RequestFuture<Void> future = new RequestFuture<>();
+        future.raise(new RuntimeException());
+
+        MockRequestFutureListener<Void> listener = new MockRequestFutureListener<>();
+        future.addListener(listener);
+
+        assertOnFailureInvoked(listener);
+    }
+
+    @Test
+    public void listenersInvokedIfAddedBeforeAndAfterFailure() {
+        RequestFuture<Void> future = new RequestFuture<>();
+
+        MockRequestFutureListener<Void> beforeListener = new MockRequestFutureListener<>();
+        future.addListener(beforeListener);
+
+        future.raise(new RuntimeException());
+
+        MockRequestFutureListener<Void> afterListener = new MockRequestFutureListener<>();
+        future.addListener(afterListener);
+
+        assertOnFailureInvoked(beforeListener);
+        assertOnFailureInvoked(afterListener);
+    }
+
+    @Test
+    public void listenersInvokedIfAddedBeforeAndAfterCompletion() {
+        RequestFuture<Void> future = new RequestFuture<>();
+
+        MockRequestFutureListener<Void> beforeListener = new MockRequestFutureListener<>();
+        future.addListener(beforeListener);
+
+        future.complete(null);
+
+        MockRequestFutureListener<Void> afterListener = new MockRequestFutureListener<>();
+        future.addListener(afterListener);
+
+        assertOnSuccessInvoked(beforeListener);
+        assertOnSuccessInvoked(afterListener);
+    }
+
+    @Test
     public void testComposeSuccessCase() {
-        RequestFuture<String> future = new RequestFuture<String>();
+        RequestFuture<String> future = new RequestFuture<>();
         RequestFuture<Integer> composed = future.compose(new RequestFutureAdapter<String,
Integer>() {
             @Override
             public void onSuccess(String value, RequestFuture<Integer> future) {
@@ -38,7 +193,7 @@ public class RequestFutureTest {
 
     @Test
     public void testComposeFailureCase() {
-        RequestFuture<String> future = new RequestFuture<String>();
+        RequestFuture<String> future = new RequestFuture<>();
         RequestFuture<Integer> composed = future.compose(new RequestFutureAdapter<String,
Integer>() {
             @Override
             public void onSuccess(String value, RequestFuture<Integer> future) {
@@ -54,4 +209,29 @@ public class RequestFutureTest {
         assertEquals(e, composed.exception());
     }
 
+    private static <T> void assertOnSuccessInvoked(MockRequestFutureListener<T>
listener) {
+        assertEquals(1, listener.numOnSuccessCalls.get());
+        assertEquals(0, listener.numOnFailureCalls.get());
+    }
+
+    private static <T> void assertOnFailureInvoked(MockRequestFutureListener<T>
listener) {
+        assertEquals(0, listener.numOnSuccessCalls.get());
+        assertEquals(1, listener.numOnFailureCalls.get());
+    }
+
+    private static class MockRequestFutureListener<T> implements RequestFutureListener<T>
{
+        private final AtomicInteger numOnSuccessCalls = new AtomicInteger(0);
+        private final AtomicInteger numOnFailureCalls = new AtomicInteger(0);
+
+        @Override
+        public void onSuccess(T value) {
+            numOnSuccessCalls.incrementAndGet();
+        }
+
+        @Override
+        public void onFailure(RuntimeException e) {
+            numOnFailureCalls.incrementAndGet();
+        }
+    }
+
 }


Mime
View raw message