kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5697; Implement new consumer poll API from KIP-266 (#4855)
Date Sat, 26 May 2018 18:50:57 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c470ff7  KAFKA-5697; Implement new consumer poll API from KIP-266 (#4855)
c470ff7 is described below

commit c470ff70d3e829c8b12f6eb6cc812c4162071a1f
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Sat May 26 13:50:51 2018 -0500

    KAFKA-5697; Implement new consumer poll API from KIP-266 (#4855)
    
    Add the new stricter-timeout version of `poll` proposed in KIP-266.
    
    The pre-existing variant `poll(long timeout)` would block indefinitely for metadata
    updates if they were needed, then it would issue a fetch and poll for `timeout` ms
    for new records. The initial indefinite metadata block caused applications to become
    stuck when the brokers became unavailable. The existence of the timeout parameter
    made the indefinite block especially unintuitive.
    
    This PR adds `poll(Duration timeout)` with the semantics:
    1. iff a metadata update is needed:
        1. send (asynchronous) metadata requests
        2. poll for metadata responses (counts against timeout)
            - if no response within timeout, **return an empty collection immediately**
    2. if there is fetch data available, **return it immediately**
    3. if there is no fetch request in flight, send fetch requests
    4. poll for fetch responses (counts against timeout)
        - if no response within timeout, **return an empty collection** (leaving async fetch request for the next poll)
        - if we get a response, **return the response**
    
    The old method, `poll(long timeout)` is deprecated, but we do not change its semantics, so it remains:
    1. iff a metadata update is needed:
        1. send (asynchronous) metadata requests
        2. poll for metadata responses *indefinitely until we get it*
    2. if there is fetch data available, **return it immediately**
    3. if there is no fetch request in flight, send fetch requests
    4. poll for fetch responses (counts against timeout)
        - if no response within timeout, **return an empty collection** (leaving async fetch request for the next poll)
        - if we get a response, **return the response**
    
    One notable usage is prohibited by the new `poll`: previously, you could call `poll(0)` to block for metadata updates, for example to initialize the client, supposedly without fetching records. Note, though, that this behavior is not according to any contract, and there is no guarantee that `poll(0)` won't return records the first time it's called. Therefore, it has always been unsafe to ignore the response.
---
 .../apache/kafka/clients/consumer/Consumer.java    |  69 +++---
 .../clients/consumer/ConsumerInterceptor.java      |   6 +-
 .../consumer/ConsumerRebalanceListener.java        |   9 +-
 .../kafka/clients/consumer/ConsumerRecords.java    |   2 +-
 .../kafka/clients/consumer/KafkaConsumer.java      | 222 +++++++++++++------
 .../kafka/clients/consumer/MockConsumer.java       |  11 +-
 .../clients/consumer/OffsetCommitCallback.java     |   2 +-
 .../consumer/internals/AbstractCoordinator.java    | 132 ++++++++----
 .../consumer/internals/ConsumerCoordinator.java    | 165 ++++++++++----
 .../consumer/internals/ConsumerNetworkClient.java  |  10 +-
 .../kafka/common/ClusterResourceListener.java      |   3 +-
 .../kafka/common/errors/WakeupException.java       |   2 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 156 +++++++++++---
 .../kafka/clients/consumer/MockConsumerTest.java   |  27 +++
 .../internals/AbstractCoordinatorTest.java         |   8 +-
 .../internals/ConsumerCoordinatorTest.java         | 236 ++++++++++-----------
 .../ClientAuthenticationFailureTest.java           |   5 +-
 .../runtime/distributed/WorkerCoordinator.java     |  18 +-
 .../runtime/distributed/WorkerCoordinatorTest.java |  12 +-
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |   6 +-
 .../org/apache/kafka/test/MockRestoreConsumer.java |   1 +
 21 files changed, 733 insertions(+), 369 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 0e27e1f..acb53e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 
 import java.io.Closeable;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -38,156 +39,162 @@ public interface Consumer<K, V> extends Closeable {
     /**
      * @see KafkaConsumer#assignment()
      */
-    public Set<TopicPartition> assignment();
+    Set<TopicPartition> assignment();
 
     /**
      * @see KafkaConsumer#subscription()
      */
-    public Set<String> subscription();
+    Set<String> subscription();
 
     /**
      * @see KafkaConsumer#subscribe(Collection)
      */
-    public void subscribe(Collection<String> topics);
+    void subscribe(Collection<String> topics);
 
     /**
      * @see KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)
      */
-    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
+    void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
 
     /**
      * @see KafkaConsumer#assign(Collection)
      */
-    public void assign(Collection<TopicPartition> partitions);
+    void assign(Collection<TopicPartition> partitions);
 
     /**
     * @see KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)
     */
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
+    void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
 
     /**
     * @see KafkaConsumer#subscribe(Pattern)
     */
-    public void subscribe(Pattern pattern);
+    void subscribe(Pattern pattern);
 
     /**
      * @see KafkaConsumer#unsubscribe()
      */
-    public void unsubscribe();
+    void unsubscribe();
 
     /**
      * @see KafkaConsumer#poll(long)
      */
-    public ConsumerRecords<K, V> poll(long timeout);
+    @Deprecated
+    ConsumerRecords<K, V> poll(long timeout);
+
+    /**
+     * @see KafkaConsumer#poll(Duration)
+     */
+    ConsumerRecords<K, V> poll(Duration timeout);
 
     /**
      * @see KafkaConsumer#commitSync()
      */
-    public void commitSync();
+    void commitSync();
 
     /**
      * @see KafkaConsumer#commitSync(Map)
      */
-    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
+    void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
 
     /**
      * @see KafkaConsumer#commitAsync()
      */
-    public void commitAsync();
+    void commitAsync();
 
     /**
      * @see KafkaConsumer#commitAsync(OffsetCommitCallback)
      */
-    public void commitAsync(OffsetCommitCallback callback);
+    void commitAsync(OffsetCommitCallback callback);
 
     /**
      * @see KafkaConsumer#commitAsync(Map, OffsetCommitCallback)
      */
-    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
+    void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
 
     /**
      * @see KafkaConsumer#seek(TopicPartition, long)
      */
-    public void seek(TopicPartition partition, long offset);
+    void seek(TopicPartition partition, long offset);
 
     /**
      * @see KafkaConsumer#seekToBeginning(Collection)
      */
-    public void seekToBeginning(Collection<TopicPartition> partitions);
+    void seekToBeginning(Collection<TopicPartition> partitions);
 
     /**
      * @see KafkaConsumer#seekToEnd(Collection)
      */
-    public void seekToEnd(Collection<TopicPartition> partitions);
+    void seekToEnd(Collection<TopicPartition> partitions);
 
     /**
      * @see KafkaConsumer#position(TopicPartition)
      */
-    public long position(TopicPartition partition);
+    long position(TopicPartition partition);
 
     /**
      * @see KafkaConsumer#committed(TopicPartition)
      */
-    public OffsetAndMetadata committed(TopicPartition partition);
+    OffsetAndMetadata committed(TopicPartition partition);
 
     /**
      * @see KafkaConsumer#metrics()
      */
-    public Map<MetricName, ? extends Metric> metrics();
+    Map<MetricName, ? extends Metric> metrics();
 
     /**
      * @see KafkaConsumer#partitionsFor(String)
      */
-    public List<PartitionInfo> partitionsFor(String topic);
+    List<PartitionInfo> partitionsFor(String topic);
 
     /**
      * @see KafkaConsumer#listTopics()
      */
-    public Map<String, List<PartitionInfo>> listTopics();
+    Map<String, List<PartitionInfo>> listTopics();
 
     /**
      * @see KafkaConsumer#paused()
      */
-    public Set<TopicPartition> paused();
+    Set<TopicPartition> paused();
 
     /**
      * @see KafkaConsumer#pause(Collection)
      */
-    public void pause(Collection<TopicPartition> partitions);
+    void pause(Collection<TopicPartition> partitions);
 
     /**
      * @see KafkaConsumer#resume(Collection)
      */
-    public void resume(Collection<TopicPartition> partitions);
+    void resume(Collection<TopicPartition> partitions);
 
     /**
      * @see KafkaConsumer#offsetsForTimes(java.util.Map)
      */
-    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
+    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
 
     /**
      * @see KafkaConsumer#beginningOffsets(java.util.Collection)
      */
-    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
+    Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
 
     /**
      * @see KafkaConsumer#endOffsets(java.util.Collection)
      */
-    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
+    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
 
     /**
      * @see KafkaConsumer#close()
      */
-    public void close();
+    void close();
 
     /**
      * @see KafkaConsumer#close(long, TimeUnit)
      */
-    public void close(long timeout, TimeUnit unit);
+    void close(long timeout, TimeUnit unit);
 
     /**
      * @see KafkaConsumer#wakeup()
      */
-    public void wakeup();
+    void wakeup();
 
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index 2f4e310..763fe51 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -35,14 +35,16 @@ import java.util.Map;
  * the user configures the interceptor with the wrong key and value type parameters, the consumer will not throw an exception,
  * just log the errors.
  * <p>
- * ConsumerInterceptor callbacks are called from the same thread that invokes {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}.
+ * ConsumerInterceptor callbacks are called from the same thread that invokes
+ * {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}.
  * <p>
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
  */
 public interface ConsumerInterceptor<K, V> extends Configurable {
 
     /**
-     * This is called just before the records are returned by {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}
+     * This is called just before the records are returned by
+     * {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}
      * <p>
      * This method is allowed to modify consumer records, in which case the new records will be
      * returned. There is no limitation on number of records that could be returned from this
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index ad2f61d..74e8b06 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -44,7 +44,8 @@ import org.apache.kafka.common.TopicPartition;
  * partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
  * consumption.
  * <p>
- * This callback will only execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes.
+ * This callback will only execute in the user thread as part of the {@link Consumer#poll(java.time.Duration) poll(long)} call
+ * whenever partition assignment changes.
  * <p>
  * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} prior to
  * any process invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned}. So if offsets or other state is saved in the
@@ -91,7 +92,7 @@ public interface ConsumerRebalanceListener {
      * It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible
      * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
      * to be raised from one these nested invocations. In this case, the exception will be propagated to the current
-     * invocation of {@link KafkaConsumer#poll(long)} in which this callback is being executed. This means it is not
+     * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
      * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
      *
      * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
@@ -103,7 +104,7 @@ public interface ConsumerRebalanceListener {
     /**
      * A callback method the user can implement to provide handling of customized offsets on completion of a successful
      * partition re-assignment. This method will be called after the partition re-assignment completes and before the
-     * consumer starts fetching data, and only as the result of a {@link Consumer#poll(long) poll(long)} call.
+     * consumer starts fetching data, and only as the result of a {@link Consumer#poll(java.time.Duration) poll(long)} call.
      * <p>
      * It is guaranteed that all the processes in a consumer group will execute their
      * {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
@@ -112,7 +113,7 @@ public interface ConsumerRebalanceListener {
      * It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible
      * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
      * to be raised from one these nested invocations. In this case, the exception will be propagated to the current
-     * invocation of {@link KafkaConsumer#poll(long)} in which this callback is being executed. This means it is not
+     * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
      * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
      *
      * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index f2dc9bb..4d0f62c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -29,7 +29,7 @@ import java.util.Set;
 /**
  * A container that holds the list {@link ConsumerRecord} per partition for a
  * particular topic. There is one {@link ConsumerRecord} list for every topic
- * partition returned by a {@link Consumer#poll(long)} operation.
+ * partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 0b18927..e5bc5c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.PollCondition;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -55,6 +54,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 import java.net.InetSocketAddress;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
@@ -98,7 +98,7 @@ import java.util.regex.Pattern;
  * <p>
  * The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given
  * out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
- * every time the consumer receives messages in a call to {@link #poll(long)}.
+ * every time the consumer receives messages in a call to {@link #poll(Duration)}.
  * <p>
  * The {@link #commitSync() committed position} is the last offset that has been stored securely. Should the
  * process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit
@@ -149,7 +149,7 @@ import java.util.regex.Pattern;
  *
  * <h3><a name="failuredetection">Detecting Consumer Failures</a></h3>
  *
- * After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is
+ * After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(Duration)} is
  * invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
  * will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers,
  * the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for
@@ -168,10 +168,10 @@ import java.util.regex.Pattern;
  * The consumer provides two configuration settings to control the behavior of the poll loop:
  * <ol>
  *     <li><code>max.poll.interval.ms</code>: By increasing the interval between expected polls, you can give
- *     the consumer more time to handle a batch of records returned from {@link #poll(long)}. The drawback
+ *     the consumer more time to handle a batch of records returned from {@link #poll(Duration)}. The drawback
  *     is that increasing this value may delay a group rebalance since the consumer will only join the rebalance
  *     inside the call to poll. You can use this setting to bound the time to finish a rebalance, but
- *     you risk slower progress if the consumer cannot actually call {@link #poll(long) poll} often enough.</li>
+ *     you risk slower progress if the consumer cannot actually call {@link #poll(Duration) poll} often enough.</li>
  *     <li><code>max.poll.records</code>: Use this setting to limit the total records returned from a single
  *     call to poll. This can make it easier to predict the maximum that must be handled within each poll
  *     interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the
@@ -180,12 +180,12 @@ import java.util.regex.Pattern;
  * <p>
  * For use cases where message processing time varies unpredictably, neither of these options may be sufficient.
  * The recommended way to handle these cases is to move message processing to another thread, which allows
- * the consumer to continue calling {@link #poll(long) poll} while the processor is still working. Some care must be taken
- * to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic
- * commits and manually commit processed offsets for records only after the thread has finished handling them
- * (depending on the delivery semantics you need). Note also that you will need to {@link #pause(Collection) pause}
- * the partition so that no new records are received from poll until after thread has finished handling those
- * previously returned.
+ * the consumer to continue calling {@link #poll(Duration) poll} while the processor is still working.
+ * Some care must be taken to ensure that committed offsets do not get ahead of the actual position.
+ * Typically, you must disable automatic commits and manually commit processed offsets for records only after the
+ * thread has finished handling them (depending on the delivery semantics you need).
+ * Note also that you will need to {@link #pause(Collection) pause} the partition so that no new records are received
+ * from poll until after thread has finished handling those previously returned.
  *
  * <h3>Usage Examples</h3>
  * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
@@ -258,7 +258,8 @@ import java.util.regex.Pattern;
  *
  * In this example we will consume a batch of records and batch them up in memory. When we have enough records
  * batched, we will insert them into a database. If we allowed offsets to auto commit as in the previous example, records
- * would be considered consumed after they were returned to the user in {@link #poll(long) poll}. It would then be possible
+ * would be considered consumed after they were returned to the user in {@link #poll(Duration) poll}. It would then be
+ * possible
  * for our process to fail after batching the records, but before they had been inserted into the database.
  * <p>
  * To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the
@@ -270,7 +271,7 @@ import java.util.regex.Pattern;
  * time but in failure cases could be duplicated.
  * <p>
  * <b>Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that
- * you must consume all data returned from each call to {@link #poll(long)} before any subsequent calls, or before
+ * you must consume all data returned from each call to {@link #poll(Duration)} before any subsequent calls, or before
  * {@link #close() closing} the consumer. If you fail to do either of these, it is possible for the committed offset
  * to get ahead of the consumed position, which results in missing records. The advantage of using manual offset
  * control is that you have direct control over when a record is considered "consumed."</b>
@@ -325,7 +326,7 @@ import java.util.regex.Pattern;
  *     consumer.assign(Arrays.asList(partition0, partition1));
  * </pre>
  *
- * Once assigned, you can call {@link #poll(long) poll} in a loop, just as in the preceding examples to consume
+ * Once assigned, you can call {@link #poll(Duration) poll} in a loop, just as in the preceding examples to consume
  * records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions
  * will only change with another call to {@link #assign(Collection) assign}. Manual partition assignment does
  * not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer
@@ -417,7 +418,7 @@ import java.util.regex.Pattern;
  * <p>
  * Kafka supports dynamic controlling of consumption flows by using {@link #pause(Collection)} and {@link #resume(Collection)}
  * to pause the consumption on the specified assigned partitions and resume the consumption
- * on the specified paused partitions respectively in the future {@link #poll(long)} calls.
+ * on the specified paused partitions respectively in the future {@link #poll(Duration)} calls.
  *
  * <h3>Reading Transactional Messages</h3>
  *
@@ -468,7 +469,7 @@ import java.util.regex.Pattern;
  *         try {
  *             consumer.subscribe(Arrays.asList("topic"));
  *             while (!closed.get()) {
- *                 ConsumerRecords records = consumer.poll(10000);
+ *                 ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
  *                 // Handle new records
  *             }
  *         } catch (WakeupException e) {
@@ -574,6 +575,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     // refcount is used to allow reentrant access by the thread who has acquired currentThread
     private final AtomicInteger refcount = new AtomicInteger(0);
 
+    // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
+    private boolean cachedSubscriptionHashAllFetchPositions;
+
     /**
      * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
      * are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >here</a>. Values can be
@@ -878,7 +882,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * <p>
      * When any of these events are triggered, the provided listener will be invoked first to indicate that
      * the consumer's assignment has been revoked, and then again when the new assignment has been received.
-     * Note that rebalances will only occur during an active call to {@link #poll(long)}, so callbacks will
+     * Note that rebalances will only occur during an active call to {@link #poll(Duration)}, so callbacks will
      * also only be invoked during that time.
      *
      * The provided listener will immediately override any listener set in a previous call to subscribe.
@@ -954,7 +958,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the
      * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there
      * is a change to the topics matching the provided pattern and when consumer group membership changes.
-     * Group rebalances only take place during an active call to {@link #poll(long)}.
+     * Group rebalances only take place during an active call to {@link #poll(Duration)}.
      *
      * @param pattern Pattern to subscribe to
      * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
@@ -1096,22 +1100,82 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws java.lang.IllegalArgumentException if the timeout value is negative
      * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
      *             partitions to consume from
+     *
+     *
+     * @deprecated Since 2.0. Use {@link #poll(Duration)} to poll for records.
+     */
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeout) {
+        return poll(timeout, false);
+    }
+
+    /**
+     * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
+     * subscribed to any topics or partitions before polling for data.
+     * <p>
+     * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
+     * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
+     * offset for the subscribed list of partitions
+     *
+     *
+     * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
+     *
+     * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
+     *
+     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
+     *             partitions is undefined or out of range and no offset reset policy has been configured
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
+     *             topics or to the configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
+     *             session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
+     * @throws java.lang.IllegalArgumentException if the timeout value is negative
+     * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
+     *             partitions to consume from
+     * @throws java.lang.ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
      */
     @Override
-    public ConsumerRecords<K, V> poll(long timeout) {
+    public ConsumerRecords<K, V> poll(final Duration timeout) {
+        return poll(timeout.toMillis(), true);
+    }
+
+    private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {
         acquireAndEnsureOpen();
         try {
-            if (timeout < 0)
-                throw new IllegalArgumentException("Timeout must not be negative");
+            if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");
 
-            if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
+            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
+            }
 
             // poll for new data until the timeout expires
-            long start = time.milliseconds();
-            long remaining = timeout;
+            long elapsedTime = 0L;
             do {
-                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
+
+                client.maybeTriggerWakeup();
+
+                final long metadataEnd;
+                if (includeMetadataInTimeout) {
+                    final long metadataStart = time.milliseconds();
+                    if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
+                        return ConsumerRecords.empty();
+                    }
+                    metadataEnd = time.milliseconds();
+                    elapsedTime += metadataEnd - metadataStart;
+                } else {
+                    while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {
+                        log.warn("Still waiting for metadata");
+                    }
+                    metadataEnd = time.milliseconds();
+                }
+
+                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+
                 if (!records.isEmpty()) {
                     // before returning the fetched records, we can send off the next round of fetches
                     // and avoid block waiting for their responses to enable pipelining while the user
@@ -1119,15 +1183,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     //
                     // NOTE: since the consumed position has already been updated, we must not allow
                     // wakeups or any other errors to be triggered prior to returning the fetched records.
-                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
+                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                         client.pollNoWakeup();
+                    }
 
                     return this.interceptors.onConsume(new ConsumerRecords<>(records));
                 }
+                final long fetchEnd = time.milliseconds();
+                elapsedTime += fetchEnd - metadataEnd;
 
-                long elapsed = time.milliseconds() - start;
-                remaining = timeout - elapsed;
-            } while (remaining > 0);
+            } while (elapsedTime < timeoutMs);
 
             return ConsumerRecords.empty();
         } finally {
@@ -1136,56 +1201,61 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Do one round of polling. In addition to checking for new data, this does any needed offset commits
-     * (if auto-commit is enabled), and offset resets (if an offset reset policy is defined).
-     * @param timeout The maximum time to block in the underlying call to {@link ConsumerNetworkClient#poll(long)}.
-     * @return The fetched records (may be empty)
+     * Visible for testing
      */
-    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
-        client.maybeTriggerWakeup();
+    boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) {
+        final long startMs = time.milliseconds();
+        if (!coordinator.poll(timeoutMs)) {
+            return false;
+        }
 
-        long startMs = time.milliseconds();
-        coordinator.poll(startMs, timeout);
+        return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs));
+    }
 
-        // Lookup positions of assigned partitions
-        boolean hasAllFetchPositions = updateFetchPositions();
+    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
+        final long startMs = time.milliseconds();
+        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);
 
         // if data is available already, return it immediately
-        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
-        if (!records.isEmpty())
+        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
+        if (!records.isEmpty()) {
             return records;
+        }
 
         // send any new fetches (won't resend pending fetches)
         fetcher.sendFetches();
 
-        long nowMs = time.milliseconds();
-        long remainingTimeMs = Math.max(0, timeout - (nowMs - startMs));
-        long pollTimeout = Math.min(coordinator.timeToNextPoll(nowMs), remainingTimeMs);
-
         // We do not want to be stuck blocking in poll if we are missing some positions
         // since the offset lookup may be backing off after a failure
-        if (!hasAllFetchPositions && pollTimeout > retryBackoffMs)
+
+        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
             pollTimeout = retryBackoffMs;
+        }
 
-        client.poll(pollTimeout, nowMs, new PollCondition() {
-            @Override
-            public boolean shouldBlock() {
-                // since a fetch might be completed by the background thread, we need this poll condition
-                // to ensure that we do not block unnecessarily in poll()
-                return !fetcher.hasCompletedFetches();
-            }
+        client.poll(pollTimeout, startMs, () -> {
+            // since a fetch might be completed by the background thread, we need this poll condition
+            // to ensure that we do not block unnecessarily in poll()
+            return !fetcher.hasCompletedFetches();
         });
 
         // after the long poll, we should check whether the group needs to rebalance
         // prior to returning data so that the group can stabilize faster
-        if (coordinator.needRejoin())
+        if (coordinator.rejoinNeededOrPending()) {
             return Collections.emptyMap();
+        }
 
         return fetcher.fetchedRecords();
     }
 
+    private long remainingTimeAtLeastZero(final long timeoutMs, final long elapsedTime) {
+        return Math.max(0, timeoutMs - elapsedTime);
+    }
+
     /**
-     * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partitions.
+     * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
+     * partitions.
      * <p>
      * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
      * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
@@ -1260,7 +1330,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partition.
+     * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and partition.
      * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
      */
     @Override
@@ -1269,7 +1339,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Commit offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+     * Commit offsets returned on the last {@link #poll(Duration) poll()} for the subscribed list of topics and partitions.
      * <p>
      * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
      * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
@@ -1327,7 +1397,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API
+     * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
      *
@@ -1350,7 +1420,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the
-     * first offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
+     * first offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called.
      * If no partitions are provided, seek to the first offset for all of the currently assigned partitions.
      *
      * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
@@ -1373,7 +1443,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the
-     * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
+     * final offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called.
      * If no partitions are provided, seek to the final offset for all of the currently assigned partitions.
      * <p>
      * If {@code isolation.level=read_committed}, the end offset will be the Last Stable Offset, i.e., the offset
@@ -1426,7 +1496,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             Long offset = this.subscriptions.position(partition);
             while (offset == null) {
                 // batch update fetch positions for any partitions without a valid position
-                updateFetchPositions();
+                while (!updateFetchPositions(Long.MAX_VALUE)) {
+                    log.warn("Still updating fetch positions");
+                }
                 client.poll(retryBackoffMs);
                 offset = this.subscriptions.position(partition);
             }
@@ -1457,7 +1529,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public OffsetAndMetadata committed(TopicPartition partition) {
         acquireAndEnsureOpen();
         try {
-            Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
+            Map<TopicPartition, OffsetAndMetadata> offsets = null;
+            while (offsets == null) {
+                offsets = coordinator.fetchCommittedOffsets(
+                    Collections.singleton(partition),
+                    Long.MAX_VALUE
+                );
+            }
             return offsets.get(partition);
         } finally {
             release();
@@ -1477,6 +1555,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * does not already have any metadata about the given topic.
      *
      * @param topic The topic to get partition metadata for
+     *
      * @return The list of partitions
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
@@ -1510,6 +1589,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * remote call to the server.
 
      * @return The map of topics and its partitions
+     *
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -1529,7 +1609,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return
+     * Suspend fetching from the requested partitions. Future calls to {@link #poll(Duration)} will not return
      * any records from these partitions until they have been resumed using {@link #resume(Collection)}.
      * Note that this method does not affect partition subscription. In particular, it does not cause a group
      * rebalance when automatic assignment is used.
@@ -1551,7 +1631,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Resume specified partitions which have been paused with {@link #pause(Collection)}. New calls to
-     * {@link #poll(long)} will return records from these partitions if there are any to be fetched.
+     * {@link #poll(Duration)} will return records from these partitions if there are any to be fetched.
      * If the partitions were not previously paused, this method is a no-op.
      * @param partitions The partitions which should be resumed
      * @throws IllegalStateException if one of the provided partitions is not assigned to this consumer
@@ -1593,6 +1673,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * will be returned for that partition.
      *
      * @param timestampsToSearch the mapping from partition to the timestamp to look up.
+     *
      * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
      *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
      *         such message.
@@ -1772,18 +1853,18 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
      * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
      *             defined
-     * @return true if all assigned positions have a position, false otherwise
+     * @return true iff the operation completed without timing out
      */
-    private boolean updateFetchPositions() {
-        if (subscriptions.hasAllFetchPositions())
-            return true;
+    private boolean updateFetchPositions(final long timeoutMs) {
+        cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions();
+        if (cachedSubscriptionHashAllFetchPositions) return true;
 
         // If there are any partitions which do not have a valid position and are not
         // awaiting reset, then we need to fetch committed offsets. We will only do a
         // coordinator lookup if there are partitions which have missing positions, so
         // a consumer with manually assigned partitions can avoid a coordinator dependence
         // by always ensuring that assigned partitions have an initial position.
-        coordinator.refreshCommittedOffsetsIfNeeded();
+        if (!coordinator.refreshCommittedOffsetsIfNeeded(timeoutMs)) return false;
 
         // If there are partitions still needing a position and a reset policy is defined,
         // request reset using the default policy. If no reset strategy is defined and there
@@ -1794,7 +1875,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         // partitions which are awaiting reset.
         fetcher.resetOffsetsIfNeeded();
 
-        return false;
+        return true;
     }
 
     /**
@@ -1835,4 +1916,5 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             throw new IllegalStateException("Must configure at least one partition assigner class name to " +
                 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
     }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index ceb7024..479a9ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -43,7 +44,7 @@ import java.util.regex.Pattern;
 /**
  * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
  * threadsafe </i>. However, you can use the {@link #schedulePollTask(Runnable)} method to write multithreaded tests
- * where a driver thread waits for {@link #poll(long)} to be called by a background thread and then can safely perform
+ * where a driver thread waits for {@link #poll(Duration)} to be called by a background thread and then can safely perform
  * operations during a callback.
  */
 public class MockConsumer<K, V> implements Consumer<K, V> {
@@ -146,8 +147,14 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         subscriptions.unsubscribe();
     }
 
+    @Deprecated
     @Override
     public synchronized ConsumerRecords<K, V> poll(long timeout) {
+        return poll(Duration.ZERO);
+    }
+
+    @Override
+    public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
         ensureNotClosed();
 
         // Synchronize around the entire execution so new tasks to be triggered on subsequent poll calls can be added in
@@ -401,7 +408,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Schedule a task to be executed during a poll(). One enqueued task will be executed per {@link #poll(long)}
+     * Schedule a task to be executed during a poll(). One enqueued task will be executed per {@link #poll(Duration)}
      * invocation. You can use this repeatedly to mock out multiple responses to poll invocations.
      * @param task the task to be executed
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
index b217a63..383c1c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
@@ -23,7 +23,7 @@ import java.util.Map;
 
 /**
  * A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback
- * may be executed in any thread calling {@link Consumer#poll(long) poll()}.
+ * may be executed in any thread calling {@link Consumer#poll(java.time.Duration) poll()}.
  */
 public interface OffsetCommitCallback {
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index dd4bb70..adbaae7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -58,6 +58,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -198,46 +199,44 @@ public abstract class AbstractCoordinator implements Closeable {
                                            ByteBuffer memberAssignment);
 
     /**
-     * Block until the coordinator for this group is known and is ready to receive requests.
-     */
-    public synchronized void ensureCoordinatorReady() {
-        // Using zero as current time since timeout is effectively infinite
-        ensureCoordinatorReady(0, Long.MAX_VALUE);
-    }
-
-    /**
+     * Visible for testing.
+     *
      * Ensure that the coordinator is ready to receive requests.
-     * @param startTimeMs Current time in milliseconds
+     *
      * @param timeoutMs Maximum time to wait to discover the coordinator
      * @return true If coordinator discovery and initial connection succeeded, false otherwise
      */
-    protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
-        long remainingMs = timeoutMs;
+    protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
+        final long startTimeMs = time.milliseconds();
+        long elapsedTime = 0L;
 
         while (coordinatorUnknown()) {
-            RequestFuture<Void> future = lookupCoordinator();
-            client.poll(future, remainingMs);
+            final RequestFuture<Void> future = lookupCoordinator();
+            client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+            if (!future.isDone()) {
+                // ran out of time
+                break;
+            }
 
             if (future.failed()) {
                 if (future.isRetriable()) {
-                    remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
-                    if (remainingMs <= 0)
-                        break;
+                    elapsedTime = time.milliseconds() - startTimeMs;
+
+                    if (elapsedTime >= timeoutMs) break;
 
                     log.debug("Coordinator discovery failed, refreshing metadata");
-                    client.awaitMetadataUpdate(remainingMs);
+                    client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+                    elapsedTime = time.milliseconds() - startTimeMs;
                 } else
                     throw future.exception();
             } else if (coordinator != null && client.isUnavailable(coordinator)) {
                 // we found the coordinator, but the connection has failed, so mark
                 // it dead and backoff before retrying discovery
                 markCoordinatorUnknown();
-                time.sleep(retryBackoffMs);
+                final long sleepTime = Math.min(retryBackoffMs, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+                time.sleep(sleepTime);
+                elapsedTime += sleepTime;
             }
-
-            remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
-            if (remainingMs <= 0)
-                break;
         }
 
         return !coordinatorUnknown();
@@ -261,15 +260,14 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     /**
-     * Check whether the group should be rejoined (e.g. if metadata changes)
+     * Check whether the group should be rejoined (e.g. if metadata changes) or whether a
+     * rejoin request is already in flight and needs to be completed.
+     *
      * @return true if it should, false otherwise
      */
-    protected synchronized boolean needRejoin() {
-        return rejoinNeeded;
-    }
-
-    private synchronized boolean rejoinIncomplete() {
-        return joinFuture != null;
+    protected synchronized boolean rejoinNeededOrPending() {
+        // if there's a pending joinFuture, we should try to complete handling it.
+        return rejoinNeeded || joinFuture != null;
     }
 
     /**
@@ -309,11 +307,28 @@ public abstract class AbstractCoordinator implements Closeable {
      * Ensure that the group is active (i.e. joined and synced)
      */
     public void ensureActiveGroup() {
+        while (!ensureActiveGroup(Long.MAX_VALUE)) {
+            log.warn("still waiting to ensure active group");
+        }
+    }
+
+    /**
+     * Ensure the group is active (i.e., joined and synced)
+     *
+     * @param timeoutMs A time budget for ensuring the group is active
+     * @return true iff the group is active
+     */
+    boolean ensureActiveGroup(final long timeoutMs) {
+        final long startTime = time.milliseconds();
         // always ensure that the coordinator is ready because we may have been disconnected
         // when sending heartbeats and does not necessarily require us to rejoin the group.
-        ensureCoordinatorReady();
+        if (!ensureCoordinatorReady(timeoutMs)) {
+            return false;
+        }
+
         startHeartbeatThreadIfNeeded();
-        joinGroupIfNeeded();
+
+        return joinGroupIfNeeded(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startTime));
     }
 
     private synchronized void startHeartbeatThreadIfNeeded() {
@@ -345,10 +360,23 @@ public abstract class AbstractCoordinator implements Closeable {
         }
     }
 
-    // visible for testing. Joins the group without starting the heartbeat thread.
-    void joinGroupIfNeeded() {
-        while (needRejoin() || rejoinIncomplete()) {
-            ensureCoordinatorReady();
+    /**
+     * Joins the group without starting the heartbeat thread.
+     *
+     * Visible for testing.
+     *
+     * @param timeoutMs Time to complete this action
+     * @return true iff the operation succeeded
+     */
+    boolean joinGroupIfNeeded(final long timeoutMs) {
+        final long startTime = time.milliseconds();
+        long elapsedTime = 0L;
+
+        while (rejoinNeededOrPending()) {
+            if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
+                return false;
+            }
+            elapsedTime = time.milliseconds() - startTime;
 
             // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
             // time if the client is woken up before a pending rebalance completes. This must be called
@@ -360,8 +388,12 @@ public abstract class AbstractCoordinator implements Closeable {
                 needsJoinPrepare = false;
             }
 
-            RequestFuture<ByteBuffer> future = initiateJoinGroup();
-            client.poll(future);
+            final RequestFuture<ByteBuffer> future = initiateJoinGroup();
+            client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+            if (!future.isDone()) {
+                // we ran out of time
+                return false;
+            }
 
             if (future.succeeded()) {
                 onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
@@ -372,7 +404,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 needsJoinPrepare = true;
             } else {
                 resetJoinGroupFuture();
-                RuntimeException exception = future.exception();
+                final RuntimeException exception = future.exception();
                 if (exception instanceof UnknownMemberIdException ||
                         exception instanceof RebalanceInProgressException ||
                         exception instanceof IllegalGenerationException)
@@ -381,7 +413,16 @@ public abstract class AbstractCoordinator implements Closeable {
                     throw exception;
                 time.sleep(retryBackoffMs);
             }
+
+            if (rejoinNeededOrPending()) {
+                elapsedTime = time.milliseconds() - startTime;
+            }
         }
+        return true;
+    }
+
+    private long remainingTimeAtLeastZero(final long timeout, final long elapsedTime) {
+        return Math.max(0, timeout - elapsedTime);
     }
 
     private synchronized void resetJoinGroupFuture() {
@@ -1035,6 +1076,21 @@ public abstract class AbstractCoordinator implements Closeable {
             this.memberId = memberId;
             this.protocol = protocol;
         }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final Generation that = (Generation) o;
+            return generationId == that.generationId &&
+                Objects.equals(memberId, that.memberId) &&
+                Objects.equals(protocol, that.protocol);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(generationId, memberId, protocol);
+        }
     }
 
     private static class UnjoinedGroupException extends RetriableException {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index eec070e..e8c5bc6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Measurable;
@@ -86,6 +87,32 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private MetadataSnapshot assignmentSnapshot;
     private long nextAutoCommitDeadline;
 
+    // hold onto request&future for commited offset requests to enable async calls.
+    private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
+
+    private static class PendingCommittedOffsetRequest {
+        private final Set<TopicPartition> request;
+        private final Generation generation;
+        private final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response;
+
+        private PendingCommittedOffsetRequest(final Set<TopicPartition> request,
+                                              final Generation generationAtRequestTime,
+                                              final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response
+        ) {
+            if (request == null) throw new NullPointerException();
+            if (response == null) throw new NullPointerException();
+
+            this.request = request;
+            this.generation = generationAtRequestTime;
+            this.response = response;
+        }
+
+        private boolean sameRequest(final Set<TopicPartition> currentRequest, final Generation currentGeneration) {
+            return (generation == null ? currentGeneration == null : generation.equals(currentGeneration))
+                && request.equals(currentRequest);
+        }
+    }
+
     /**
      * Initialize the coordination manager.
      */
@@ -243,7 +270,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // update the metadata and enforce a refresh to make sure the fetcher can start
         // fetching data in the next iteration
         this.metadata.setTopics(subscriptions.groupSubscription());
-        client.ensureFreshMetadata();
+        if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new TimeoutException();
 
         // give the assignor a chance to update internal state based on the received assignment
         assignor.onAssignment(assignment);
@@ -268,30 +295,43 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * Poll for coordinator events. This ensures that the coordinator is known and that the consumer
      * has joined the group (if it is using group management). This also handles periodic offset commits
      * if they are enabled.
+     * <p>
+     * Returns early if the timeout expires
      *
-     * @param now current time in milliseconds
+     * @param timeoutMs The amount of time, in ms, allotted for this operation.
+     * @return true iff the operation succeeded
      */
-    public void poll(long now, long remainingMs) {
+    public boolean poll(final long timeoutMs) {
+        final long startTime = time.milliseconds();
+        long elapsed = 0L;
+
         invokeCompletedOffsetCommitCallbacks();
 
         if (subscriptions.partitionsAutoAssigned()) {
             if (coordinatorUnknown()) {
-                ensureCoordinatorReady();
-                now = time.milliseconds();
+                if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
+                    return false;
+                }
+                elapsed = time.milliseconds() - startTime;
             }
 
-            if (needRejoin()) {
+            if (rejoinNeededOrPending()) {
                 // due to a race condition between the initial metadata fetch and the initial rebalance,
                 // we need to ensure that the metadata is fresh before joining initially. This ensures
                 // that we have matched the pattern against the cluster's topics at least once before joining.
-                if (subscriptions.hasPatternSubscription())
-                    client.ensureFreshMetadata();
+                if (subscriptions.hasPatternSubscription()) {
+                    if (!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
+                        return false;
+                    }
+                    elapsed = time.milliseconds() - startTime;
+                }
 
-                ensureActiveGroup();
-                now = time.milliseconds();
+                if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
+                    return false;
+                }
             }
 
-            pollHeartbeat(now);
+            pollHeartbeat(startTime);
         } else {
             // For manually assigned partitions, if there are no ready nodes, await metadata.
             // If connections to all nodes fail, wakeups triggered while attempting to send fetch
@@ -301,18 +341,23 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             // When group management is used, metadata wait is already performed for this scenario as
             // coordinator is unknown, hence this check is not required.
             if (metadata.updateRequested() && !client.hasReadyNodes()) {
-                boolean metadataUpdated = client.awaitMetadataUpdate(remainingMs);
-                if (!metadataUpdated && !client.hasReadyNodes())
-                    return;
-                now = time.milliseconds();
+                final boolean metadataUpdated = client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsed));
+                if (!metadataUpdated && !client.hasReadyNodes()) {
+                    return false;
+                }
             }
         }
 
-        maybeAutoCommitOffsetsAsync(now);
+        maybeAutoCommitOffsetsAsync(startTime);
+        return true;
+    }
+
+    private long remainingTimeAtLeastZero(final long timeoutMs, final long elapsed) {
+        return Math.max(0, timeoutMs - elapsed);
     }
 
     /**
-     * Return the time to the next needed invocation of {@link #poll(long, long)}.
+     * Return the time to the next needed invocation of {@link #poll(long)}.
      * @param now current time in milliseconds
      * @return the maximum time in milliseconds the caller should wait before the next invocation of poll()
      */
@@ -349,7 +394,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         // update metadata (if needed) and keep track of the metadata used for assignment so that
         // we can check after rebalance completion whether anything has changed
-        client.ensureFreshMetadata();
+        if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new TimeoutException();
 
         isLeader = true;
 
@@ -385,7 +430,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             allSubscribedTopics.addAll(assignedTopics);
             this.subscriptions.groupSubscribe(allSubscribedTopics);
             metadata.setTopics(this.subscriptions.groupSubscription());
-            client.ensureFreshMetadata();
+            if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new TimeoutException();
         }
 
         assignmentSnapshot = metadataSnapshot;
@@ -423,7 +468,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     @Override
-    public boolean needRejoin() {
+    public boolean rejoinNeededOrPending() {
         if (!subscriptions.partitionsAutoAssigned())
             return false;
 
@@ -435,60 +480,94 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
             return true;
 
-        return super.needRejoin();
+        return super.rejoinNeededOrPending();
     }
 
     /**
      * Refresh the committed offsets for provided partitions.
+     *
+     * @param timeoutMs A time limit for this operation
+     * @return true iff the operation completed within the timeout
      */
-    public void refreshCommittedOffsetsIfNeeded() {
-        Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();
-        Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions);
-        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
-            TopicPartition tp = entry.getKey();
-            long offset = entry.getValue().offset();
+    public boolean refreshCommittedOffsetsIfNeeded(final long timeoutMs) {
+        final Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();
+
+        final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions, timeoutMs);
+        if (offsets == null) return false;
+
+        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+            final TopicPartition tp = entry.getKey();
+            final long offset = entry.getValue().offset();
             log.debug("Setting offset for partition {} to the committed offset {}", tp, offset);
             this.subscriptions.seek(tp, offset);
         }
+        return true;
     }
 
     /**
      * Fetch the current committed offsets from the coordinator for a set of partitions.
+     *
      * @param partitions The partitions to fetch offsets for
-     * @return A map from partition to the committed offset
+     * @return A map from partition to the committed offset or null if the operation timed out
      */
-    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
-        if (partitions.isEmpty())
-            return Collections.emptyMap();
+    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(final Set<TopicPartition> partitions,
+                                                                        final long timeoutMs) {
+        if (partitions.isEmpty()) return Collections.emptyMap();
+
+        final Generation generation = generation();
+        if (pendingCommittedOffsetRequest != null && !pendingCommittedOffsetRequest.sameRequest(partitions, generation)) {
+            // if we were waiting for a different request, then just clear it.
+            pendingCommittedOffsetRequest = null;
+        }
+
+        final long startMs = time.milliseconds();
+        long elapsedTime = 0L;
 
         while (true) {
-            ensureCoordinatorReady();
+            if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) return null;
+            elapsedTime = time.milliseconds() - startMs;
 
             // contact coordinator to fetch committed offsets
-            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
-            client.poll(future);
+            final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+            if (pendingCommittedOffsetRequest != null) {
+                future = pendingCommittedOffsetRequest.response;
+            } else {
+                future = sendOffsetFetchRequest(partitions);
+                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generation, future);
 
-            if (future.succeeded())
-                return future.value();
+            }
+            client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
 
-            if (!future.isRetriable())
-                throw future.exception();
+            if (future.isDone()) {
+                pendingCommittedOffsetRequest = null;
 
-            time.sleep(retryBackoffMs);
+                if (future.succeeded()) {
+                    return future.value();
+                } else if (!future.isRetriable()) {
+                    throw future.exception();
+                } else {
+                    elapsedTime = time.milliseconds() - startMs;
+                    final long sleepTime = Math.min(retryBackoffMs, remainingTimeAtLeastZero(startMs, elapsedTime));
+                    time.sleep(sleepTime);
+                    elapsedTime += sleepTime;
+                }
+            } else {
+                return null;
+            }
         }
     }
 
-    public void close(long timeoutMs) {
+    public void close(final long timeoutMs) {
         // we do not need to re-enable wakeups since we are closing already
         client.disableWakeups();
 
         long now = time.milliseconds();
-        long endTimeMs = now + timeoutMs;
+        final long endTimeMs = now + timeoutMs;
         try {
             maybeAutoCommitOffsetsSync(timeoutMs);
             now = time.milliseconds();
             if (pendingAsyncCommits.get() > 0 && endTimeMs > now) {
-                ensureCoordinatorReady(now, endTimeMs - now);
+                ensureCoordinatorReady(endTimeMs - now);
                 now = time.milliseconds();
             }
         } finally {
@@ -587,7 +666,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         long remainingMs = timeoutMs;
         do {
             if (coordinatorUnknown()) {
-                if (!ensureCoordinatorReady(now, remainingMs))
+                if (!ensureCoordinatorReady(remainingMs))
                     return false;
 
                 remainingMs = timeoutMs - (time.milliseconds() - startMs);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index e1598fa..7a9f717 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -164,9 +164,13 @@ public class ConsumerNetworkClient implements Closeable {
      * Ensure our metadata is fresh (if an update is expected, this will block
      * until it has completed).
      */
-    public void ensureFreshMetadata() {
-        if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
-            awaitMetadataUpdate();
+    boolean ensureFreshMetadata(final long timeout) {
+        if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0) {
+            return awaitMetadataUpdate(timeout);
+        } else {
+            // the metadata is already fresh
+            return true;
+        }
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java b/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java
index f8f99ec..97245a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java
+++ b/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java
@@ -36,7 +36,8 @@ package org.apache.kafka.common;
  * {@link org.apache.kafka.common.serialization.Deserializer} : The {@link ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked before {@link org.apache.kafka.common.serialization.Deserializer#deserialize(String, byte[])}
  * <p>
  * {@link org.apache.kafka.common.metrics.MetricsReporter} : The {@link ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked after first {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} invocation for Producer metrics reporter
- * and after first {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)} invocation for Consumer metrics reporters. The reporter may receive metric events from the network layer before this method is invoked.
+ * and after first {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)} invocation for Consumer metrics
+ * reporters. The reporter may receive metric events from the network layer before this method is invoked.
  * <h4>Broker</h4>
  * There is a single invocation {@link ClusterResourceListener#onUpdate(ClusterResource)} on broker start-up and the cluster metadata will never change.
  * <p>
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java b/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java
index 4726ec1..f8ae840 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.KafkaException;
 /**
  * Exception used to indicate preemption of a blocking operation by an external thread.
  * For example, {@link org.apache.kafka.clients.consumer.KafkaConsumer#wakeup}
- * can be used to break out of an active {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)},
+ * can be used to break out of an active {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)},
  * which would raise an instance of this exception.
  */
 public class WakeupException extends KafkaException {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8c147a5..ce722cf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -78,6 +78,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -87,6 +88,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -353,7 +355,12 @@ public class KafkaConsumerTest {
         // initial fetch
         client.prepareResponseFrom(fetchResponse(tp0, 0, 0), node);
 
-        consumer.poll(0);
+        // We need two update calls:
+        // 1. the first call "sends" the metadata update requests
+        // 2. the second one gets the response we already queued up
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+
         assertEquals(singleton(tp0), consumer.assignment());
 
         AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator);
@@ -362,7 +369,8 @@ public class KafkaConsumerTest {
         time.sleep(heartbeatIntervalMs);
         Thread.sleep(heartbeatIntervalMs);
 
-        consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
 
         assertTrue(heartbeatReceived.get());
         consumer.close(0, TimeUnit.MILLISECONDS);
@@ -385,7 +393,9 @@ public class KafkaConsumerTest {
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
-        consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.poll(Duration.ZERO);
 
         // respond to the outstanding fetch so that we have data available on the next poll
         client.respondFrom(fetchResponse(tp0, 0, 5), node);
@@ -397,13 +407,64 @@ public class KafkaConsumerTest {
         time.sleep(heartbeatIntervalMs);
         Thread.sleep(heartbeatIntervalMs);
 
-        consumer.poll(0);
+        consumer.poll(Duration.ZERO);
 
         assertTrue(heartbeatReceived.get());
         consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
+    public void verifyPollTimesOutDuringMetadataUpdate() throws Exception {
+        final Time time = new MockTime();
+        final Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        final Node node = cluster.nodes().get(0);
+
+        final Metadata metadata = createMetadata();
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        final MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        final PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
+        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
+        prepareRebalance(client, node, assignor, singletonList(tp0), null);
+
+        consumer.poll(Duration.ZERO);
+
+        // The underlying client should NOT get a fetch request
+        final Queue<ClientRequest> requests = client.requests();
+        Assert.assertEquals(0, requests.size());
+    }
+
+    @Test
+    public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() throws Exception {
+        final Time time = new MockTime();
+        final Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        final Node node = cluster.nodes().get(0);
+
+        final Metadata metadata = createMetadata();
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        final MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        final PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
+        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
+        prepareRebalance(client, node, assignor, singletonList(tp0), null);
+
+        //noinspection deprecation
+        consumer.poll(0L);
+
+        // The underlying client SHOULD get a fetch request
+        final Queue<ClientRequest> requests = client.requests();
+        Assert.assertEquals(1, requests.size());
+        final Class<? extends AbstractRequest.Builder> aClass = requests.peek().requestBuilder().getClass();
+        Assert.assertEquals(FetchRequest.Builder.class, aClass);
+    }
+
+    @Test
     public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
@@ -425,7 +486,7 @@ public class KafkaConsumerTest {
         client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L)));
         client.prepareResponse(fetchResponse(tp0, 50L, 5));
 
-        ConsumerRecords<String, String> records = consumer.poll(5);
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
         assertEquals(5, records.count());
         assertEquals(55L, consumer.position(tp0));
         consumer.close(0, TimeUnit.MILLISECONDS);
@@ -474,7 +535,7 @@ public class KafkaConsumerTest {
                     }
                 }, fetchResponse(tp0, 50L, 5));
 
-        ConsumerRecords<String, String> records = consumer.poll(5);
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
         assertEquals(5, records.count());
         assertEquals(singleton(tp0), records.partitions());
     }
@@ -501,7 +562,7 @@ public class KafkaConsumerTest {
 
         // lookup committed offset and find nothing
         client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator);
-        consumer.poll(0);
+        consumer.poll(Duration.ZERO);
     }
 
     @Test
@@ -525,7 +586,7 @@ public class KafkaConsumerTest {
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L), Errors.NONE), coordinator);
-        consumer.poll(0);
+        consumer.poll(Duration.ZERO);
 
         assertEquals(539L, consumer.position(tp0));
     }
@@ -553,7 +614,7 @@ public class KafkaConsumerTest {
         client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator);
         client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L)));
 
-        consumer.poll(0);
+        consumer.poll(Duration.ZERO);
 
         assertEquals(50L, consumer.position(tp0));
     }
@@ -617,7 +678,9 @@ public class KafkaConsumerTest {
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
-        consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.poll(Duration.ZERO);
 
         // respond to the outstanding fetch so that we have data available on the next poll
         client.respondFrom(fetchResponse(tp0, 0, 5), node);
@@ -630,7 +693,7 @@ public class KafkaConsumerTest {
         // no data has been returned to the user yet, so the committed offset should be 0
         AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 0);
 
-        consumer.poll(0);
+        consumer.poll(Duration.ZERO);
 
         assertTrue(commitReceived.get());
         consumer.close(0, TimeUnit.MILLISECONDS);
@@ -660,7 +723,9 @@ public class KafkaConsumerTest {
 
         client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
-        consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+
         assertEquals(singleton(topic), consumer.subscription());
         assertEquals(singleton(tp0), consumer.assignment());
         consumer.close(0, TimeUnit.MILLISECONDS);
@@ -694,13 +759,16 @@ public class KafkaConsumerTest {
         Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
 
-        consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.poll(Duration.ZERO);
+
         assertEquals(singleton(topic), consumer.subscription());
 
         consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer));
 
         prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator);
-        consumer.poll(0);
+        consumer.poll(Duration.ZERO);
 
         assertEquals(singleton(otherTopic), consumer.subscription());
         consumer.close(0, TimeUnit.MILLISECONDS);
@@ -723,7 +791,9 @@ public class KafkaConsumerTest {
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
-        consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.poll(Duration.ZERO);
 
         // respond to the outstanding fetch so that we have data available on the next poll
         client.respondFrom(fetchResponse(tp0, 0, 5), node);
@@ -732,7 +802,7 @@ public class KafkaConsumerTest {
         consumer.wakeup();
 
         try {
-            consumer.poll(0);
+            consumer.poll(Duration.ZERO);
             fail();
         } catch (WakeupException e) {
         }
@@ -741,7 +811,7 @@ public class KafkaConsumerTest {
         assertEquals(0, consumer.position(tp0));
 
         // the next poll should return the completed fetch
-        ConsumerRecords<String, String> records = consumer.poll(0);
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
         assertEquals(5, records.count());
         // Increment time asynchronously to clear timeouts in closing the consumer
         final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
@@ -773,13 +843,15 @@ public class KafkaConsumerTest {
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
-        consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.poll(Duration.ZERO);
 
         // interrupt the thread and call poll
         try {
             Thread.currentThread().interrupt();
             expectedException.expect(InterruptException.class);
-            consumer.poll(0);
+            consumer.poll(Duration.ZERO);
         } finally {
             // clear interrupted state again since this thread may be reused by JUnit
             Thread.interrupted();
@@ -810,7 +882,10 @@ public class KafkaConsumerTest {
         fetches1.put(t2p0, new FetchInfo(0, 10)); // not assigned and not fetched
         client.prepareResponseFrom(fetchResponse(fetches1), node);
 
-        ConsumerRecords<String, String> records = consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
         assertEquals(0, records.count());
         consumer.close(0, TimeUnit.MILLISECONDS);
     }
@@ -852,7 +927,9 @@ public class KafkaConsumerTest {
         // mock rebalance responses
         Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
 
-        consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.poll(Duration.ZERO);
 
         // verify that subscription is still the same, and now assignment has caught up
         assertTrue(consumer.subscription().size() == 2);
@@ -867,7 +944,7 @@ public class KafkaConsumerTest {
         client.respondFrom(fetchResponse(fetches1), node);
         client.poll(0, time.milliseconds());
 
-        ConsumerRecords<String, String> records = consumer.poll(0);
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
 
         // clear out the prefetch so it doesn't interfere with the rest of the test
         fetches1.put(tp0, new FetchInfo(1, 0));
@@ -904,7 +981,7 @@ public class KafkaConsumerTest {
         fetches2.put(t3p0, new FetchInfo(0, 100));
         client.prepareResponse(fetchResponse(fetches2));
 
-        records = consumer.poll(0);
+        records = consumer.poll(Duration.ofMillis(1));
 
         // verify that the fetch occurred as expected
         assertEquals(101, records.count());
@@ -965,13 +1042,15 @@ public class KafkaConsumerTest {
         // mock rebalance responses
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
-        consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.poll(Duration.ZERO);
 
         // verify that subscription is still the same, and now assignment has caught up
         assertTrue(consumer.subscription().equals(singleton(topic)));
         assertTrue(consumer.assignment().equals(singleton(tp0)));
 
-        consumer.poll(0);
+        consumer.poll(Duration.ZERO);
 
         // subscription change
         consumer.subscribe(singleton(topic2), getConsumerRebalanceListener(consumer));
@@ -1037,7 +1116,7 @@ public class KafkaConsumerTest {
         client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L)));
         client.prepareResponse(fetchResponse(tp0, 10L, 1));
 
-        ConsumerRecords<String, String> records = consumer.poll(5);
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
         assertEquals(1, records.count());
         assertEquals(11L, consumer.position(tp0));
 
@@ -1096,7 +1175,7 @@ public class KafkaConsumerTest {
         client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L)));
         client.prepareResponse(fetchResponse(tp0, 10L, 1));
 
-        ConsumerRecords<String, String> records = consumer.poll(5);
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
         assertEquals(1, records.count());
         assertEquals(11L, consumer.position(tp0));
 
@@ -1171,7 +1250,7 @@ public class KafkaConsumerTest {
     @Test(expected = IllegalStateException.class)
     public void testPollWithNoSubscription() {
         try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
-            consumer.poll(0);
+            consumer.poll(Duration.ZERO);
         }
     }
 
@@ -1179,7 +1258,7 @@ public class KafkaConsumerTest {
     public void testPollWithEmptySubscription() {
         try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
             consumer.subscribe(Collections.<String>emptyList());
-            consumer.poll(0);
+            consumer.poll(Duration.ZERO);
         }
     }
 
@@ -1187,7 +1266,7 @@ public class KafkaConsumerTest {
     public void testPollWithEmptyUserAssignment() {
         try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
             consumer.assign(Collections.<TopicPartition>emptySet());
-            consumer.poll(0);
+            consumer.poll(Duration.ZERO);
         }
     }
 
@@ -1220,7 +1299,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testCloseInterrupt() throws Exception {
-        consumerCloseTest(Long.MAX_VALUE, Collections.<AbstractResponse>emptyList(), 0, true);
+        consumerCloseTest(Long.MAX_VALUE, Collections.emptyList(), 0, true);
     }
 
     @Test
@@ -1270,7 +1349,9 @@ public class KafkaConsumerTest {
         client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);
         client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node);
 
-        consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.poll(Duration.ZERO);
 
         // heartbeat fails due to rebalance in progress
         client.prepareResponseFrom(new MockClient.RequestMatcher() {
@@ -1306,7 +1387,9 @@ public class KafkaConsumerTest {
         }, fetchResponse(tp0, 1, 1), node);
         time.sleep(heartbeatIntervalMs);
         Thread.sleep(heartbeatIntervalMs);
-        final ConsumerRecords<String, String> records = consumer.poll(0);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        final ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
         assertFalse(records.isEmpty());
         consumer.close(0, TimeUnit.MILLISECONDS);
     }
@@ -1333,10 +1416,13 @@ public class KafkaConsumerTest {
 
         client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+        consumer.updateAssignmentMetadataIfNeeded(0L);
+
         // Poll with responses
         client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);
         client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node);
-        consumer.poll(0);
+        consumer.poll(Duration.ZERO);
 
         // Initiate close() after a commit request on another thread.
         // Kafka consumer is single-threaded, but the implementation allows calls on a
@@ -1457,7 +1543,7 @@ public class KafkaConsumerTest {
         }
 
         try {
-            consumer.poll(10);
+            consumer.poll(Duration.ZERO);
             fail("Expected an authentication error!");
         } catch (AuthenticationException e) {
             // OK
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 67f8e8d..1d01eb6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -35,6 +36,32 @@ public class MockConsumerTest {
     @Test
     public void testSimpleMock() {
         consumer.subscribe(Collections.singleton("test"));
+        assertEquals(0, consumer.poll(Duration.ZERO).count());
+        consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
+        // Mock consumers need to seek manually since they cannot automatically reset offsets
+        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(new TopicPartition("test", 0), 0L);
+        beginningOffsets.put(new TopicPartition("test", 1), 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+        consumer.seek(new TopicPartition("test", 0), 0);
+        ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
+        ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
+        consumer.addRecord(rec1);
+        consumer.addRecord(rec2);
+        ConsumerRecords<String, String> recs = consumer.poll(Duration.ofMillis(1));
+        Iterator<ConsumerRecord<String, String>> iter = recs.iterator();
+        assertEquals(rec1, iter.next());
+        assertEquals(rec2, iter.next());
+        assertFalse(iter.hasNext());
+        assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
+        consumer.commitSync();
+        assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testSimpleMockDeprecated() {
+        consumer.subscribe(Collections.singleton("test"));
         assertEquals(0, consumer.poll(1000).count());
         consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
         // Mock consumers need to seek manually since they cannot automatically reset offsets
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 1c88803..135763d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -102,7 +102,7 @@ public class AbstractCoordinatorTest {
         mockClient.blackout(coordinatorNode, 10L);
 
         long initialTime = mockTime.milliseconds();
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
         long endTime = mockTime.milliseconds();
 
         assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
@@ -183,7 +183,7 @@ public class AbstractCoordinatorTest {
         assertTrue("New request sent while one is in progress", future == coordinator.lookupCoordinator());
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
         assertTrue("New request not sent after previous completed", future != coordinator.lookupCoordinator());
     }
 
@@ -527,7 +527,7 @@ public class AbstractCoordinatorTest {
         mockClient.authenticationFailed(node, 300);
 
         try {
-            coordinator.ensureCoordinatorReady();
+            coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
             fail("Expected an authentication error.");
         } catch (AuthenticationException e) {
             // OK
@@ -537,7 +537,7 @@ public class AbstractCoordinatorTest {
         assertTrue(mockClient.connectionFailed(node));
 
         try {
-            coordinator.ensureCoordinatorReady();
+            coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
             fail("Expected an authentication error.");
         } catch (AuthenticationException e) {
             // OK
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 0304190..1828873 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -144,7 +144,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testNormalHeartbeat() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // normal heartbeat
         time.sleep(sessionTimeoutMs);
@@ -162,7 +162,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = GroupAuthorizationException.class)
     public void testGroupDescribeUnauthorized() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.GROUP_AUTHORIZATION_FAILED));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
     }
 
     @Test(expected = GroupAuthorizationException.class)
@@ -170,17 +170,17 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
                 Errors.GROUP_AUTHORIZATION_FAILED));
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
     }
 
     @Test
     public void testCoordinatorNotAvailable() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -201,7 +201,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() throws Exception {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         int numRequests = 1000;
         TopicPartition tp = new TopicPartition("foo", 0);
@@ -233,7 +233,7 @@ public class ConsumerCoordinatorTest {
         // the coordinator as unknown which prevents additional retries to the same coordinator.
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
         Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = singletonMap(
@@ -259,7 +259,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testNotCoordinator() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // not_coordinator will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -280,7 +280,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testIllegalGeneration() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // illegal_generation will cause re-partition
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -298,13 +298,13 @@ public class ConsumerCoordinatorTest {
         assertTrue(future.isDone());
         assertTrue(future.failed());
         assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception());
-        assertTrue(coordinator.needRejoin());
+        assertTrue(coordinator.rejoinNeededOrPending());
     }
 
     @Test
     public void testUnknownConsumerId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // illegal_generation will cause re-partition
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -322,13 +322,13 @@ public class ConsumerCoordinatorTest {
         assertTrue(future.isDone());
         assertTrue(future.failed());
         assertEquals(Errors.UNKNOWN_MEMBER_ID.exception(), future.exception());
-        assertTrue(coordinator.needRejoin());
+        assertTrue(coordinator.rejoinNeededOrPending());
     }
 
     @Test
     public void testCoordinatorDisconnect() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // coordinator disconnect will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -357,11 +357,11 @@ public class ConsumerCoordinatorTest {
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String, List<String>>emptyMap(),
                 Errors.INVALID_GROUP_ID));
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
     }
 
     @Test
@@ -375,7 +375,7 @@ public class ConsumerCoordinatorTest {
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // normal join group
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
@@ -391,9 +391,9 @@ public class ConsumerCoordinatorTest {
                         sync.groupAssignment().containsKey(consumerId);
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
         assertEquals(singleton(topic1), subscriptions.groupSubscription());
         assertEquals(1, rebalanceListener.revokedCount);
@@ -414,7 +414,7 @@ public class ConsumerCoordinatorTest {
         metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // normal join group
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
@@ -433,9 +433,9 @@ public class ConsumerCoordinatorTest {
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(2, subscriptions.assignedPartitions().size());
         assertEquals(2, subscriptions.groupSubscription().size());
         assertEquals(2, subscriptions.subscription().size());
@@ -456,7 +456,7 @@ public class ConsumerCoordinatorTest {
         assertEquals(singleton(topic1), subscriptions.subscription());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         Map<String, List<String>> initialSubscription = singletonMap(consumerId, singletonList(topic1));
         partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
@@ -496,9 +496,9 @@ public class ConsumerCoordinatorTest {
         }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE));
 
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(updatedSubscriptionSet, subscriptions.subscription());
         assertEquals(newAssignmentSet, subscriptions.assignedPartitions());
         assertEquals(2, rebalanceListener.revokedCount);
@@ -518,7 +518,7 @@ public class ConsumerCoordinatorTest {
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
         partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
@@ -528,16 +528,16 @@ public class ConsumerCoordinatorTest {
         consumerClient.wakeup();
 
         try {
-            coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+            coordinator.poll(Long.MAX_VALUE);
         } catch (WakeupException e) {
             // ignore
         }
 
         // now complete the second half
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(Collections.emptySet(), rebalanceListener.revoked);
@@ -552,7 +552,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // normal join group
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
@@ -566,9 +566,9 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
         assertEquals(singleton(topic1), subscriptions.groupSubscription());
         assertEquals(1, rebalanceListener.revokedCount);
@@ -589,7 +589,7 @@ public class ConsumerCoordinatorTest {
         metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // normal join group
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
@@ -605,9 +605,9 @@ public class ConsumerCoordinatorTest {
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(2, subscriptions.assignedPartitions().size());
         assertEquals(2, subscriptions.subscription().size());
         assertEquals(1, rebalanceListener.revokedCount);
@@ -667,12 +667,12 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_SERVER_ERROR));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
     }
 
     @Test
@@ -682,7 +682,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // join initially, but let coordinator returns unknown member id
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
@@ -698,9 +698,9 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
     }
 
@@ -711,7 +711,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
@@ -721,9 +721,9 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
     }
 
@@ -734,7 +734,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
@@ -750,9 +750,9 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
     }
 
@@ -767,7 +767,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
         partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
@@ -776,15 +776,15 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
 
         // a new partition is added to the topic
         metadata.update(TestUtils.singletonCluster(topic1, 2), Collections.<String>emptySet(), time.milliseconds());
 
         // we should detect the change and ask for reassignment
-        assertTrue(coordinator.needRejoin());
+        assertTrue(coordinator.rejoinNeededOrPending());
     }
 
     @Test
@@ -804,7 +804,7 @@ public class ConsumerCoordinatorTest {
         metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // prepare initial rebalance
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, topics);
@@ -833,9 +833,9 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE));
 
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
     }
 
@@ -873,16 +873,16 @@ public class ConsumerCoordinatorTest {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
         partitionAssignor.prepare(Collections.<String, List<TopicPartition>>emptyMap());
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE));
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         if (!assign) {
-            assertFalse(coordinator.needRejoin());
+            assertFalse(coordinator.rejoinNeededOrPending());
             assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
         }
         assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
@@ -891,11 +891,11 @@ public class ConsumerCoordinatorTest {
         client.poll(0, time.milliseconds());
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
 
         assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested());
         if (!assign) {
-            assertFalse(coordinator.needRejoin());
+            assertFalse(coordinator.rejoinNeededOrPending());
             assertEquals(singleton(t1p), rebalanceListener.assigned);
         }
     }
@@ -937,7 +937,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener);
         client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
 
         assertEquals(2, rebalanceListener.revokedCount);
         assertEquals(singleton(t1p), rebalanceListener.revoked);
@@ -950,16 +950,16 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // disconnected from original coordinator will cause re-discover and join again
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE), true);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -971,11 +971,11 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // coordinator doesn't like the session timeout
         client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
     }
 
     @Test
@@ -983,7 +983,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(singleton(t1p));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
 
@@ -1005,7 +1005,7 @@ public class ConsumerCoordinatorTest {
 
     private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error) {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // Send two async commits and fail the first one with an error.
         // This should cause a coordinator disconnect which will cancel the second request.
@@ -1038,7 +1038,7 @@ public class ConsumerCoordinatorTest {
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
         time.sleep(autoCommitIntervalMs);
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertFalse(client.hasPendingResponses());
     }
 
@@ -1055,23 +1055,23 @@ public class ConsumerCoordinatorTest {
 
         // Send an offset commit, but let it fail with a retriable error
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertTrue(coordinator.coordinatorUnknown());
 
         // After the disconnect, we should rediscover the coordinator
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
 
         subscriptions.seek(t1p, 200);
 
         // Until the retry backoff has expired, we should not retry the offset commit
         time.sleep(retryBackoffMs / 2);
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertEquals(0, client.inFlightRequestCount());
 
         // Once the backoff expires, we should retry
         time.sleep(retryBackoffMs / 2);
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertEquals(1, client.inFlightRequestCount());
         respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
     }
@@ -1088,28 +1088,28 @@ public class ConsumerCoordinatorTest {
         time.sleep(autoCommitIntervalMs);
 
         // Send the offset commit request, but do not respond
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertEquals(1, client.inFlightRequestCount());
 
         time.sleep(autoCommitIntervalMs / 2);
 
         // Ensure that no additional offset commit is sent
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertEquals(1, client.inFlightRequestCount());
 
         respondToOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertEquals(0, client.inFlightRequestCount());
 
         subscriptions.seek(t1p, 200);
 
         // If we poll again before the auto-commit interval, there should be no new sends
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertEquals(0, client.inFlightRequestCount());
 
         // After the remainder of the interval passes, we send a new offset commit
         time.sleep(autoCommitIntervalMs / 2);
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertEquals(1, client.inFlightRequestCount());
         respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
     }
@@ -1124,7 +1124,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // haven't joined, so should not cause a commit
         time.sleep(autoCommitIntervalMs);
@@ -1132,13 +1132,13 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
 
         subscriptions.seek(t1p, 100);
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
         time.sleep(autoCommitIntervalMs);
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertFalse(client.hasPendingResponses());
     }
 
@@ -1151,11 +1151,11 @@ public class ConsumerCoordinatorTest {
         subscriptions.seek(t1p, 100);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
         time.sleep(autoCommitIntervalMs);
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertFalse(client.hasPendingResponses());
     }
 
@@ -1174,12 +1174,12 @@ public class ConsumerCoordinatorTest {
 
         // now find the coordinator
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // sleep only for the retry backoff
         time.sleep(retryBackoffMs);
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
         assertFalse(client.hasPendingResponses());
     }
 
@@ -1188,7 +1188,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(singleton(t1p));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
 
@@ -1204,7 +1204,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetAsyncWithDefaultCallback() {
         int invokedBeforeTest = mockOffsetCommitCallback.invoked;
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
         coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
@@ -1245,7 +1245,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetAsyncFailedWithDefaultCallback() {
         int invokedBeforeTest = mockOffsetCommitCallback.invoked;
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
         coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
@@ -1256,7 +1256,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetAsyncCoordinatorNotAvailable() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // async commit with coordinator not available
         MockCommitCallback cb = new MockCommitCallback();
@@ -1272,7 +1272,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetAsyncNotCoordinator() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // async commit with not coordinator
         MockCommitCallback cb = new MockCommitCallback();
@@ -1288,7 +1288,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetAsyncDisconnected() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // async commit with coordinator disconnected
         MockCommitCallback cb = new MockCommitCallback();
@@ -1304,7 +1304,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetSyncNotCoordinator() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
@@ -1316,7 +1316,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetSyncCoordinatorNotAvailable() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
@@ -1328,7 +1328,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetSyncCoordinatorDisconnected() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
         prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L));
@@ -1340,7 +1340,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion() throws Exception {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         final List<OffsetAndMetadata> committedOffsets = Collections.synchronizedList(new ArrayList<OffsetAndMetadata>());
         final OffsetAndMetadata firstOffset = new OffsetAndMetadata(0L);
@@ -1376,7 +1376,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testRetryCommitUnknownTopicOrPartition() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
         client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
@@ -1388,7 +1388,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetMetadataTooLarge() {
         // since offset metadata is provided by the user, we have to propagate the exception so they can handle it
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.OFFSET_METADATA_TOO_LARGE);
         coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
@@ -1398,7 +1398,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetIllegalGeneration() {
         // we cannot retry if a rebalance occurs before the commit completed
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.ILLEGAL_GENERATION);
         coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
@@ -1408,7 +1408,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetUnknownMemberId() {
         // we cannot retry if a rebalance occurs before the commit completed
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
         coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
@@ -1418,7 +1418,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetRebalanceInProgress() {
         // we cannot retry if a rebalance occurs before the commit completed
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
         coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
@@ -1427,7 +1427,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = KafkaException.class)
     public void testCommitOffsetSyncCallbackWithNonRetriableException() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // sync commit with invalid partitions should throw if we have no callback
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_SERVER_ERROR);
@@ -1453,18 +1453,18 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetSyncWithoutFutureGetsCompleted() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
         assertFalse(coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), 0));
     }
 
     @Test
     public void testRefreshOffset() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         subscriptions.assignFromUser(singleton(t1p));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
-        coordinator.refreshCommittedOffsetsIfNeeded();
+        coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE);
 
         assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
         assertTrue(subscriptions.hasAllFetchPositions());
@@ -1474,12 +1474,12 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testRefreshOffsetLoadInProgress() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         subscriptions.assignFromUser(singleton(t1p));
         client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
-        coordinator.refreshCommittedOffsetsIfNeeded();
+        coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE);
 
         assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
         assertTrue(subscriptions.hasAllFetchPositions());
@@ -1489,12 +1489,12 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testRefreshOffsetsGroupNotAuthorized() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         subscriptions.assignFromUser(singleton(t1p));
         client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED));
         try {
-            coordinator.refreshCommittedOffsetsIfNeeded();
+            coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE);
             fail("Expected group authorization error");
         } catch (GroupAuthorizationException e) {
             assertEquals(groupId, e.groupId());
@@ -1504,23 +1504,23 @@ public class ConsumerCoordinatorTest {
     @Test(expected = KafkaException.class)
     public void testRefreshOffsetUnknownTopicOrPartition() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         subscriptions.assignFromUser(singleton(t1p));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION, "", 100L));
-        coordinator.refreshCommittedOffsetsIfNeeded();
+        coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE);
     }
 
     @Test
     public void testRefreshOffsetNotCoordinatorForConsumer() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         subscriptions.assignFromUser(singleton(t1p));
         client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
-        coordinator.refreshCommittedOffsetsIfNeeded();
+        coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE);
 
         assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
         assertTrue(subscriptions.hasAllFetchPositions());
@@ -1530,11 +1530,11 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testRefreshOffsetWithNoFetchableOffsets() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         subscriptions.assignFromUser(singleton(t1p));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", -1L));
-        coordinator.refreshCommittedOffsetsIfNeeded();
+        coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE);
 
         assertEquals(Collections.singleton(t1p), subscriptions.missingFetchPositions());
         assertEquals(Collections.emptySet(), subscriptions.partitionsNeedingReset(time.milliseconds()));
@@ -1548,7 +1548,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.seek(t1p, 500L);
-        coordinator.refreshCommittedOffsetsIfNeeded();
+        coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE);
 
         assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
         assertTrue(subscriptions.hasAllFetchPositions());
@@ -1562,7 +1562,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.requestOffsetReset(t1p, OffsetResetStrategy.EARLIEST);
-        coordinator.refreshCommittedOffsetsIfNeeded();
+        coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE);
 
         assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
         assertFalse(subscriptions.hasAllFetchPositions());
@@ -1741,17 +1741,17 @@ public class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
                 ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit, leaveGroup);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
         if (useGroupManagement) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
             client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-            coordinator.joinGroupIfNeeded();
+            coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
         } else
             subscriptions.assignFromUser(singleton(t1p));
 
         subscriptions.seek(t1p, 100);
-        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        coordinator.poll(Long.MAX_VALUE);
 
         return coordinator;
     }
@@ -1910,10 +1910,10 @@ public class ConsumerCoordinatorTest {
                                                     ConsumerCoordinator coordinator,
                                                     List<TopicPartition> assignment) {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(assignment, Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(Long.MAX_VALUE);
     }
 
     private void prepareOffsetCommitRequest(Map<TopicPartition, Long> expectedOffsets, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index c64597b..413997f 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -38,6 +38,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -85,12 +86,12 @@ public class ClientAuthenticationFailureTest {
 
         try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, deserializer, deserializer)) {
             consumer.subscribe(Arrays.asList(topic));
-            consumer.poll(100);
+            consumer.poll(Duration.ofSeconds(10));
             fail("Expected an authentication error!");
         } catch (SaslAuthenticationException e) {
             // OK
         } catch (Exception e) {
-            fail("Expected only an authentication error, but another error occurred: " + e.getMessage());
+            throw new AssertionError("Expected only an authentication error, but another error occurred.", e);
         }
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 60407c1..796ecdf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -103,6 +103,12 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         return "connect";
     }
 
+    // expose for tests
+    @Override
+    protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
+        return super.ensureCoordinatorReady(timeoutMs);
+    }
+
     public void poll(long timeout) {
         // poll for io until the timeout expires
         final long start = time.milliseconds();
@@ -111,11 +117,11 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
 
         do {
             if (coordinatorUnknown()) {
-                ensureCoordinatorReady();
+                ensureCoordinatorReady(Long.MAX_VALUE);
                 now = time.milliseconds();
             }
 
-            if (needRejoin()) {
+            if (rejoinNeededOrPending()) {
                 ensureActiveGroup();
                 now = time.milliseconds();
             }
@@ -282,8 +288,8 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    protected boolean needRejoin() {
-        return super.needRejoin() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested;
+    protected boolean rejoinNeededOrPending() {
+        return super.rejoinNeededOrPending() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested;
     }
 
     public String memberId() {
@@ -298,13 +304,13 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     public String ownerUrl(String connector) {
-        if (needRejoin() || !isLeader())
+        if (rejoinNeededOrPending() || !isLeader())
             return null;
         return leaderState.ownerUrl(connector);
     }
 
     public String ownerUrl(ConnectorTaskId task) {
-        if (needRejoin() || !isLeader())
+        if (rejoinNeededOrPending() || !isLeader())
             return null;
         return leaderState.ownerUrl(task);
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 154b1df..e1017f2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -208,7 +208,7 @@ public class WorkerCoordinatorTest {
         final String consumerId = "leader";
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // normal join group
         Map<String, Long> memberConfigOffsets = new HashMap<>();
@@ -227,7 +227,7 @@ public class WorkerCoordinatorTest {
                 Collections.<ConnectorTaskId>emptyList(), Errors.NONE));
         coordinator.ensureActiveGroup();
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(0, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
         assertFalse(rebalanceListener.assignment.failed());
@@ -248,7 +248,7 @@ public class WorkerCoordinatorTest {
         final String memberId = "member";
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // normal join group
         client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE));
@@ -264,7 +264,7 @@ public class WorkerCoordinatorTest {
                 Collections.singletonList(taskId1x0), Errors.NONE));
         coordinator.ensureActiveGroup();
 
-        assertFalse(coordinator.needRejoin());
+        assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(0, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
         assertFalse(rebalanceListener.assignment.failed());
@@ -289,7 +289,7 @@ public class WorkerCoordinatorTest {
         final String memberId = "member";
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // config mismatch results in assignment error
         client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE));
@@ -320,7 +320,7 @@ public class WorkerCoordinatorTest {
         PowerMock.replayAll();
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
 
         // join the group once
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a122297..034557e 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -121,6 +121,10 @@ object ZooKeeperTestHarness {
     val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads =>
       threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s)))
     }
-    assertTrue(s"Found unexpected threads during $context, allThreads=$threads", noUnexpected)
+    assertTrue(
+      s"Found unexpected threads during $context, allThreads=$threads, " +
+        s"unexpected=${threads.filterNot(t => unexpectedThreadNames.forall(s => !t.contains(s)))}",
+      noUnexpected
+    )
   }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index ce88917..3070e36 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -85,6 +85,7 @@ public class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
         super.assign(partitions);
     }
 
+    @Deprecated
     @Override
     public ConsumerRecords<byte[], byte[]> poll(long timeout) {
         // add buffered records to MockConsumer

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message