kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/3] kafka git commit: KAFKA-2123: add callback in commit api and use a delayed queue for async requests; reviewed by Ewen Cheslack-Postava and Guozhang Wang
Date Wed, 15 Jul 2015 19:40:45 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index c1c8172..8e3cd09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -12,14 +12,14 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
-import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.CommitType;
+import org.apache.kafka.clients.consumer.ConsumerCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.ConsumerMetadataRequest;
 import org.apache.kafka.common.requests.ConsumerMetadataResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
@@ -41,15 +40,15 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
-import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -62,23 +61,27 @@ public final class Coordinator {
 
     private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
 
-    private final KafkaClient client;
-
+    private final ConsumerNetworkClient client;
     private final Time time;
     private final String groupId;
     private final Heartbeat heartbeat;
+    private final HeartbeatTask heartbeatTask;
     private final int sessionTimeoutMs;
     private final String assignmentStrategy;
     private final SubscriptionState subscriptions;
     private final CoordinatorMetrics sensors;
+    private final long requestTimeoutMs;
+    private final long retryBackoffMs;
+    private final RebalanceCallback rebalanceCallback;
     private Node consumerCoordinator;
     private String consumerId;
     private int generation;
 
+
     /**
      * Initialize the coordination manager.
      */
-    public Coordinator(KafkaClient client,
+    public Coordinator(ConsumerNetworkClient client,
                        String groupId,
                        int sessionTimeoutMs,
                        String assignmentStrategy,
@@ -86,10 +89,13 @@ public final class Coordinator {
                        Metrics metrics,
                        String metricGrpPrefix,
                        Map<String, String> metricTags,
-                       Time time) {
+                       Time time,
+                       long requestTimeoutMs,
+                       long retryBackoffMs,
+                       RebalanceCallback rebalanceCallback) {
 
-        this.time = time;
         this.client = client;
+        this.time = time;
         this.generation = -1;
         this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
         this.groupId = groupId;
@@ -98,19 +104,190 @@ public final class Coordinator {
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.assignmentStrategy = assignmentStrategy;
         this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
+        this.heartbeatTask = new HeartbeatTask();
         this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.retryBackoffMs = retryBackoffMs;
+        this.rebalanceCallback = rebalanceCallback;
+    }
+
+    /**
+     * Refresh the committed offsets for provided partitions.
+     */
+    public void refreshCommittedOffsetsIfNeeded() {
+        if (subscriptions.refreshCommitsNeeded()) {
+            Map<TopicPartition, Long> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
+            for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                this.subscriptions.committed(tp, entry.getValue());
+            }
+            this.subscriptions.commitsRefreshed();
+        }
+    }
+
+    /**
+     * Fetch the current committed offsets from the coordinator for a set of partitions.
+     * @param partitions The partitions to fetch offsets for
+     * @return A map from partition to the committed offset
+     */
+    public Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
+        while (true) {
+            ensureCoordinatorKnown();
+            ensurePartitionAssignment();
+
+            // contact coordinator to fetch committed offsets
+            RequestFuture<Map<TopicPartition, Long>> future = sendOffsetFetchRequest(partitions);
+            client.poll(future);
+
+            if (future.succeeded())
+                return future.value();
+
+            if (!future.isRetriable())
+                throw future.exception();
+
+            Utils.sleep(retryBackoffMs);
+        }
+    }
+
+    /**
+     * Ensure that we have a valid partition assignment from the coordinator.
+     */
+    public void ensurePartitionAssignment() {
+        if (!subscriptions.partitionAssignmentNeeded())
+            return;
+
+        // execute the user's callback before rebalance
+        log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
+        try {
+            Set<TopicPartition> revoked = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
+            rebalanceCallback.onPartitionsRevoked(revoked);
+        } catch (Exception e) {
+            log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+                    + " failed on partition revocation: ", e);
+        }
+
+        reassignPartitions();
+
+        // execute the user's callback after rebalance
+        log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
+        try {
+            Set<TopicPartition> assigned = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
+            rebalanceCallback.onPartitionsAssigned(assigned);
+        } catch (Exception e) {
+            log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+                    + " failed on partition assignment: ", e);
+        }
+    }
+
+    private void reassignPartitions() {
+        while (subscriptions.partitionAssignmentNeeded()) {
+            ensureCoordinatorKnown();
+
+            // ensure that there are no pending requests to the coordinator. This is important
+            // in particular to avoid resending a pending JoinGroup request.
+            if (client.pendingRequestCount(this.consumerCoordinator) > 0) {
+                client.awaitPendingRequests(this.consumerCoordinator);
+                continue;
+            }
+
+            RequestFuture<Void> future = sendJoinGroupRequest();
+            client.poll(future);
+
+            if (future.failed()) {
+                if (!future.isRetriable())
+                    throw future.exception();
+                Utils.sleep(retryBackoffMs);
+            }
+        }
+    }
+
+    /**
+     * Block until the coordinator for this group is known.
+     */
+    public void ensureCoordinatorKnown() {
+        while (coordinatorUnknown()) {
+            RequestFuture<Void> future = sendConsumerMetadataRequest();
+            client.poll(future, requestTimeoutMs);
+
+            if (future.failed())
+                client.awaitMetadataUpdate();
+        }
+    }
+
+    /**
+     * Commit offsets. This call blocks (regardless of commitType) until the coordinator
+     * can receive the commit request. Once the request has been made, however, only the
+     * synchronous commits will wait for a successful response from the coordinator.
+     * @param offsets Offsets to commit.
+     * @param commitType Commit policy
+     * @param callback Callback to be executed when the commit request finishes
+     */
+    public void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
+        if (commitType == CommitType.ASYNC)
+            commitOffsetsAsync(offsets, callback);
+        else
+            commitOffsetsSync(offsets, callback);
+    }
+
+    private class HeartbeatTask implements DelayedTask {
+
+        public void reset() {
+            // start or restart the heartbeat task to be executed at the next chance
+            long now = time.milliseconds();
+            heartbeat.resetSessionTimeout(now);
+            client.unschedule(this);
+            client.schedule(this, now);
+        }
+
+        @Override
+        public void run(final long now) {
+            if (!subscriptions.partitionsAutoAssigned() ||
+                    subscriptions.partitionAssignmentNeeded() ||
+                    coordinatorUnknown())
+                // no need to send if we're not using auto-assignment or if we are
+                // awaiting a rebalance
+                return;
+
+            if (heartbeat.sessionTimeoutExpired(now)) {
+                // we haven't received a successful heartbeat in one session interval
+                // so mark the coordinator dead
+                coordinatorDead();
+                return;
+            }
+
+            if (!heartbeat.shouldHeartbeat(now)) {
+                // we don't need to heartbeat now, so reschedule for when we do
+                client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
+            } else {
+                heartbeat.sentHeartbeat(now);
+                RequestFuture<Void> future = sendHeartbeatRequest();
+                future.addListener(new RequestFutureListener<Void>() {
+                    @Override
+                    public void onSuccess(Void value) {
+                        long now = time.milliseconds();
+                        heartbeat.receiveHeartbeat(now);
+                        long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
+                        client.schedule(HeartbeatTask.this, nextHeartbeatTime);
+                    }
+
+                    @Override
+                    public void onFailure(RuntimeException e) {
+                        client.schedule(HeartbeatTask.this, retryBackoffMs);
+                    }
+                });
+            }
+        }
     }
 
     /**
      * Send a request to get a new partition assignment. This is a non-blocking call which sends
      * a JoinGroup request to the coordinator (if it is available). The returned future must
      * be polled to see if the request completed successfully.
-     * @param now The current time in milliseconds
      * @return A request future whose completion indicates the result of the JoinGroup request.
      */
-    public RequestFuture<Void> assignPartitions(final long now) {
-        final RequestFuture<Void> future = newCoordinatorRequestFuture(now);
-        if (future.isDone()) return future;
+    private RequestFuture<Void> sendJoinGroupRequest() {
+        if (coordinatorUnknown())
+            return RequestFuture.coordinatorNotAvailable();
 
         // send a join group request to the coordinator
         List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscribedTopics());
@@ -124,25 +301,20 @@ public final class Coordinator {
 
         // create the request for the coordinator
         log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id());
+        return client.send(consumerCoordinator, ApiKeys.JOIN_GROUP, request)
+                .compose(new JoinGroupResponseHandler());
+    }
 
-        RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
-            @Override
-            public void onComplete(ClientResponse resp) {
-                handleJoinResponse(resp, future);
-            }
-        };
+    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, Void> {
 
-        sendCoordinator(ApiKeys.JOIN_GROUP, request.toStruct(), completionHandler, now);
-        return future;
-    }
+        @Override
+        public JoinGroupResponse parse(ClientResponse response) {
+            return new JoinGroupResponse(response.responseBody());
+        }
 
-    private void handleJoinResponse(ClientResponse response, RequestFuture<Void> future) {
-        if (response.wasDisconnected()) {
-            handleCoordinatorDisconnect(response);
-            future.retryWithNewCoordinator();
-        } else {
+        @Override
+        public void handle(JoinGroupResponse joinResponse, RequestFuture<Void> future) {
             // process the response
-            JoinGroupResponse joinResponse = new JoinGroupResponse(response.responseBody());
             short errorCode = joinResponse.errorCode();
 
             if (errorCode == Errors.NONE.code()) {
@@ -152,36 +324,36 @@ public final class Coordinator {
                 // set the flag to refresh last committed offsets
                 subscriptions.needRefreshCommits();
 
-                log.debug("Joined group: {}", response);
+                log.debug("Joined group: {}", joinResponse.toStruct());
 
                 // record re-assignment time
-                this.sensors.partitionReassignments.record(response.requestLatencyMs());
+                sensors.partitionReassignments.record(response.requestLatencyMs());
 
                 // update partition assignment
                 subscriptions.changePartitionAssignment(joinResponse.assignedPartitions());
+                heartbeatTask.reset();
                 future.complete(null);
             } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
                 // reset the consumer id and retry immediately
                 Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
                 log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.",
                         groupId);
-
-                future.retryNow();
+                future.raise(Errors.UNKNOWN_CONSUMER_ID);
             } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
                     || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                 // re-discover the coordinator and retry with backoff
                 coordinatorDead();
                 log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
                         groupId);
-                future.retryWithNewCoordinator();
+                future.raise(Errors.forCode(errorCode));
             } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()
                     || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code()
                     || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
                 // log the error and re-throw the exception
-                KafkaException e = Errors.forCode(errorCode).exception();
+                Errors error = Errors.forCode(errorCode);
                 log.error("Attempt to join group {} failed due to: {}",
-                        groupId, e.getMessage());
-                future.raise(e);
+                        groupId, error.exception().getMessage());
+                future.raise(error);
             } else {
                 // unexpected error, throw the exception
                 future.raise(new KafkaException("Unexpected error in join group response: "
@@ -190,55 +362,134 @@ public final class Coordinator {
         }
     }
 
+    private void commitOffsetsAsync(final Map<TopicPartition, Long> offsets, final ConsumerCommitCallback callback) {
+        this.subscriptions.needRefreshCommits();
+        RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
+        if (callback != null) {
+            future.addListener(new RequestFutureListener<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    callback.onComplete(offsets, null);
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    callback.onComplete(offsets, e);
+                }
+            });
+        }
+    }
+
+    private void commitOffsetsSync(Map<TopicPartition, Long> offsets, ConsumerCommitCallback callback) {
+        while (true) {
+            ensureCoordinatorKnown();
+            ensurePartitionAssignment();
+
+            RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
+            client.poll(future);
+
+            if (future.succeeded()) {
+                if (callback != null)
+                    callback.onComplete(offsets, null);
+                return;
+            }
+
+            if (!future.isRetriable()) {
+                if (callback == null)
+                    throw future.exception();
+                else
+                    callback.onComplete(offsets, future.exception());
+                return;
+            }
+
+            Utils.sleep(retryBackoffMs);
+        }
+    }
+
     /**
      * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
      * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
      * asynchronous case.
      *
      * @param offsets The list of offsets per partition that should be committed.
-     * @param now The current time
      * @return A request future whose value indicates whether the commit was successful or not
      */
-    public RequestFuture<Void> commitOffsets(final Map<TopicPartition, Long> offsets, long now) {
-        final RequestFuture<Void> future = newCoordinatorRequestFuture(now);
-        if (future.isDone()) return future;
+    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, Long> offsets) {
+        if (coordinatorUnknown())
+            return RequestFuture.coordinatorNotAvailable();
 
-        if (offsets.isEmpty()) {
-            future.complete(null);
-        } else {
-            // create the offset commit request
-            Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
-            offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
-            for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
-                offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
-            OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
+        if (offsets.isEmpty())
+            return RequestFuture.voidSuccess();
+
+        // create the offset commit request
+        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
+        offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
+        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+            offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
+        OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
                 this.generation,
                 this.consumerId,
                 OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                 offsetData);
 
-            RequestCompletionHandler handler = new OffsetCommitCompletionHandler(offsets, future);
-            sendCoordinator(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
-        }
-
-        return future;
+        return client.send(consumerCoordinator, ApiKeys.OFFSET_COMMIT, req)
+                .compose(new OffsetCommitResponseHandler(offsets));
     }
 
-    private <T> RequestFuture<T> newCoordinatorRequestFuture(long now) {
-        if (coordinatorUnknown())
-            return RequestFuture.newCoordinatorNeeded();
 
-        if (client.ready(this.consumerCoordinator, now))
-            // We have an open connection and we're ready to send
-            return new RequestFuture<T>();
+    private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
+
+        private final Map<TopicPartition, Long> offsets;
+
+        public OffsetCommitResponseHandler(Map<TopicPartition, Long> offsets) {
+            this.offsets = offsets;
+        }
 
-        if (this.client.connectionFailed(this.consumerCoordinator)) {
-            coordinatorDead();
-            return RequestFuture.newCoordinatorNeeded();
+        @Override
+        public OffsetCommitResponse parse(ClientResponse response) {
+            return new OffsetCommitResponse(response.responseBody());
         }
 
-        // The connection has been initiated, so we need to poll to finish it
-        return RequestFuture.pollNeeded();
+        @Override
+        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
+            sensors.commitLatency.record(response.requestLatencyMs());
+            for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
+                TopicPartition tp = entry.getKey();
+                long offset = this.offsets.get(tp);
+                short errorCode = entry.getValue();
+                if (errorCode == Errors.NONE.code()) {
+                    log.debug("Committed offset {} for partition {}", offset, tp);
+                    subscriptions.committed(tp, offset);
+                } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+                        || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+                    coordinatorDead();
+                    future.raise(Errors.forCode(errorCode));
+                    return;
+                } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code()
+                        || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) {
+                    // do not need to throw the exception but just log the error
+                    log.error("Error committing partition {} at offset {}: {}",
+                            tp,
+                            offset,
+                            Errors.forCode(errorCode).exception().getMessage());
+                } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
+                        || errorCode == Errors.ILLEGAL_GENERATION.code()) {
+                    // need to re-join group
+                    subscriptions.needReassignment();
+                    future.raise(Errors.forCode(errorCode));
+                    return;
+                } else {
+                    // do not need to throw the exception but just log the error
+                    future.raise(Errors.forCode(errorCode));
+                    log.error("Error committing partition {} at offset {}: {}",
+                            tp,
+                            offset,
+                            Errors.forCode(errorCode).exception().getMessage());
+                }
+            }
+
+            future.complete(null);
+        }
     }
 
     /**
@@ -246,35 +497,30 @@ public final class Coordinator {
      * returned future can be polled to get the actual offsets returned from the broker.
      *
      * @param partitions The set of partitions to get offsets for.
-     * @param now The current time in milliseconds
      * @return A request future containing the committed offsets.
      */
-    public RequestFuture<Map<TopicPartition, Long>> fetchOffsets(Set<TopicPartition> partitions, long now) {
-        final RequestFuture<Map<TopicPartition, Long>> future = newCoordinatorRequestFuture(now);
-        if (future.isDone()) return future;
+    private RequestFuture<Map<TopicPartition, Long>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
+        if (coordinatorUnknown())
+            return RequestFuture.coordinatorNotAvailable();
 
-        log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
+        log.debug("Fetching committed offsets for partitions: {}",  Utils.join(partitions, ", "));
         // construct the request
         OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
 
         // send the request with a callback
-        RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
-            @Override
-            public void onComplete(ClientResponse resp) {
-                handleOffsetFetchResponse(resp, future);
-            }
-        };
-        sendCoordinator(ApiKeys.OFFSET_FETCH, request.toStruct(), completionHandler, now);
-        return future;
+        return client.send(consumerCoordinator, ApiKeys.OFFSET_FETCH, request)
+                .compose(new OffsetFetchResponseHandler());
     }
 
-    private void handleOffsetFetchResponse(ClientResponse resp, RequestFuture<Map<TopicPartition, Long>> future) {
-        if (resp.wasDisconnected()) {
-            handleCoordinatorDisconnect(resp);
-            future.retryWithNewCoordinator();
-        } else {
-            // parse the response to get the offsets
-            OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
+    private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, Long>> {
+
+        @Override
+        public OffsetFetchResponse parse(ClientResponse response) {
+            return new OffsetFetchResponse(response.responseBody());
+        }
+
+        @Override
+        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, Long>> future) {
             Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
             for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                 TopicPartition tp = entry.getKey();
@@ -285,19 +531,21 @@ public final class Coordinator {
                             .getMessage());
                     if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
                         // just retry
-                        future.retryAfterBackoff();
+                        future.raise(Errors.OFFSET_LOAD_IN_PROGRESS);
                     } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                         // re-discover the coordinator and retry
                         coordinatorDead();
-                        future.retryWithNewCoordinator();
+                        future.raise(Errors.NOT_COORDINATOR_FOR_CONSUMER);
                     } else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
                             || data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
                         // need to re-join group
                         subscriptions.needReassignment();
+                        future.raise(Errors.forCode(data.errorCode));
                     } else {
                         future.raise(new KafkaException("Unexpected error in fetch offset response: "
                                 + Errors.forCode(data.errorCode).exception().getMessage()));
                     }
+                    return;
                 } else if (data.offset >= 0) {
                     // record the position with the offset (-1 indicates no committed offset to fetch)
                     offsets.put(tp, data.offset);
@@ -306,82 +554,47 @@ public final class Coordinator {
                 }
             }
 
-            if (!future.isDone())
-                future.complete(offsets);
-        }
-    }
-
-    /**
-     * Attempt to heartbeat the consumer coordinator if necessary, and check if the coordinator is still alive.
-     *
-     * @param now The current time
-     */
-    public void maybeHeartbeat(long now) {
-        if (heartbeat.shouldHeartbeat(now) && coordinatorReady(now)) {
-            HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
-            sendCoordinator(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now);
-            this.heartbeat.sentHeartbeat(now);
+            future.complete(offsets);
         }
     }
 
     /**
-     * Get the time until the next heartbeat is needed.
-     * @param now The current time
-     * @return The duration in milliseconds before the next heartbeat will be needed.
+     * Send a heartbeat request now (visible only for testing).
      */
-    public long timeToNextHeartbeat(long now) {
-        return heartbeat.timeToNextHeartbeat(now);
-    }
-
-    /**
-     * Check whether the coordinator has any in-flight requests.
-     * @return true if the coordinator has pending requests.
-     */
-    public boolean hasInFlightRequests() {
-        return !coordinatorUnknown() && client.inFlightRequestCount(consumerCoordinator.idString()) > 0;
+    public RequestFuture<Void> sendHeartbeatRequest() {
+        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
+        return client.send(consumerCoordinator, ApiKeys.HEARTBEAT, req)
+                .compose(new HeartbeatCompletionHandler());
     }
 
     public boolean coordinatorUnknown() {
         return this.consumerCoordinator == null;
     }
 
-    private boolean coordinatorReady(long now) {
-        return !coordinatorUnknown() && this.client.ready(this.consumerCoordinator, now);
-    }
-
     /**
      * Discover the current coordinator for the consumer group. Sends a ConsumerMetadata request to
      * one of the brokers. The returned future should be polled to get the result of the request.
      * @return A request future which indicates the completion of the metadata request
      */
-    public RequestFuture<Void> discoverConsumerCoordinator() {
+    private RequestFuture<Void> sendConsumerMetadataRequest() {
         // initiate the consumer metadata request
         // find a node to ask about the coordinator
-        long now = time.milliseconds();
-        Node node = this.client.leastLoadedNode(now);
-
+        Node node = this.client.leastLoadedNode();
         if (node == null) {
-            return RequestFuture.metadataRefreshNeeded();
-        } else if (!this.client.ready(node, now)) {
-            if (this.client.connectionFailed(node)) {
-                return RequestFuture.metadataRefreshNeeded();
-            } else {
-                return RequestFuture.pollNeeded();
-            }
+            // TODO: If there are no brokers left, perhaps we should use the bootstrap set
+            // from configuration?
+            return RequestFuture.noBrokersAvailable();
         } else {
-            final RequestFuture<Void> future = new RequestFuture<Void>();
-
             // create a consumer metadata request
             log.debug("Issuing consumer metadata request to broker {}", node.id());
             ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId);
-            RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
-                @Override
-                public void onComplete(ClientResponse resp) {
-                    handleConsumerMetadataResponse(resp, future);
-                }
-            };
-            send(node, ApiKeys.CONSUMER_METADATA, metadataRequest.toStruct(), completionHandler, now);
-            return future;
+            return client.send(node, ApiKeys.CONSUMER_METADATA, metadataRequest)
+                    .compose(new RequestFutureAdapter<ClientResponse, Void>() {
+                        @Override
+                        public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
+                            handleConsumerMetadataResponse(response, future);
+                        }
+                    });
         }
     }
 
@@ -391,7 +604,10 @@ public final class Coordinator {
         // parse the response to get the coordinator info if it is not disconnected,
         // otherwise we need to request metadata update
         if (resp.wasDisconnected()) {
-            future.retryAfterMetadataRefresh();
+            future.raise(new DisconnectException());
+        } else if (!coordinatorUnknown()) {
+            // We already found the coordinator, so ignore the request
+            future.complete(null);
         } else {
             ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody());
             // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
@@ -401,9 +617,10 @@ public final class Coordinator {
                 this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
                         consumerMetadataResponse.node().host(),
                         consumerMetadataResponse.node().port());
+                heartbeatTask.reset();
                 future.complete(null);
             } else {
-                future.retryAfterBackoff();
+                future.raise(Errors.forCode(consumerMetadataResponse.errorCode()));
             }
         }
     }
@@ -418,115 +635,84 @@ public final class Coordinator {
         }
     }
 
-    /**
-     * Handle the case when the request gets cancelled due to coordinator disconnection.
-     */
-    private void handleCoordinatorDisconnect(ClientResponse response) {
-        int correlation = response.request().request().header().correlationId();
-        log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
-                response.request(),
-                correlation,
-                response.request().request().destination());
-
-        // mark the coordinator as dead
-        coordinatorDead();
-    }
-
-
-    private void sendCoordinator(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
-        send(this.consumerCoordinator, api, request, handler, now);
-    }
-
-    private void send(Node node, ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
-        RequestHeader header = this.client.nextRequestHeader(api);
-        RequestSend send = new RequestSend(node.idString(), header, request);
-        this.client.send(new ClientRequest(now, true, send, handler));
-    }
+    private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
+        @Override
+        public HeartbeatResponse parse(ClientResponse response) {
+            return new HeartbeatResponse(response.responseBody());
+        }
 
-    private class HeartbeatCompletionHandler implements RequestCompletionHandler {
         @Override
-        public void onComplete(ClientResponse resp) {
-            if (resp.wasDisconnected()) {
-                handleCoordinatorDisconnect(resp);
+        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
+            sensors.heartbeatLatency.record(response.requestLatencyMs());
+            short error = heartbeatResponse.errorCode();
+            if (error == Errors.NONE.code()) {
+                log.debug("Received successful heartbeat response.");
+                future.complete(null);
+            } else if (error == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+                    || error == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+                log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
+                coordinatorDead();
+                future.raise(Errors.forCode(error));
+            } else if (error == Errors.ILLEGAL_GENERATION.code()) {
+                log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
+                subscriptions.needReassignment();
+                future.raise(Errors.ILLEGAL_GENERATION);
+            } else if (error == Errors.UNKNOWN_CONSUMER_ID.code()) {
+                log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
+                consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+                subscriptions.needReassignment();
+                future.raise(Errors.UNKNOWN_CONSUMER_ID);
             } else {
-                HeartbeatResponse response = new HeartbeatResponse(resp.responseBody());
-                if (response.errorCode() == Errors.NONE.code()) {
-                    log.debug("Received successful heartbeat response.");
-                } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                        || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                    log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
-                    coordinatorDead();
-                } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) {
-                    log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
-                    subscriptions.needReassignment();
-                } else if (response.errorCode() == Errors.UNKNOWN_CONSUMER_ID.code()) {
-                    log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
-                    consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
-                    subscriptions.needReassignment();
-                } else {
-                    throw new KafkaException("Unexpected error in heartbeat response: "
-                        + Errors.forCode(response.errorCode()).exception().getMessage());
-                }
+                future.raise(new KafkaException("Unexpected error in heartbeat response: "
+                        + Errors.forCode(error).exception().getMessage()));
             }
-            sensors.heartbeatLatency.record(resp.requestLatencyMs());
         }
     }
 
-    private class OffsetCommitCompletionHandler implements RequestCompletionHandler {
+    private abstract class CoordinatorResponseHandler<R, T>
+            extends RequestFutureAdapter<ClientResponse, T> {
+        protected ClientResponse response;
 
-        private final Map<TopicPartition, Long> offsets;
-        private final RequestFuture<Void> future;
+        public abstract R parse(ClientResponse response);
 
-        public OffsetCommitCompletionHandler(Map<TopicPartition, Long> offsets, RequestFuture<Void> future) {
-            this.offsets = offsets;
-            this.future = future;
-        }
+        public abstract void handle(R response, RequestFuture<T> future);
 
         @Override
-        public void onComplete(ClientResponse resp) {
-            if (resp.wasDisconnected()) {
-                handleCoordinatorDisconnect(resp);
-                future.retryWithNewCoordinator();
-            } else {
-                OffsetCommitResponse commitResponse = new OffsetCommitResponse(resp.responseBody());
-                for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
-                    TopicPartition tp = entry.getKey();
-                    short errorCode = entry.getValue();
-                    long offset = this.offsets.get(tp);
-                    if (errorCode == Errors.NONE.code()) {
-                        log.debug("Committed offset {} for partition {}", offset, tp);
-                        subscriptions.committed(tp, offset);
-                    } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                            || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                        coordinatorDead();
-                        future.retryWithNewCoordinator();
-                    } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code()
-                            || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) {
-                        // do not need to throw the exception but just log the error
-                        log.error("Error committing partition {} at offset {}: {}",
-                                tp,
-                                offset,
-                                Errors.forCode(errorCode).exception().getMessage());
-                    } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
-                            || errorCode == Errors.ILLEGAL_GENERATION.code()) {
-                        // need to re-join group
-                        subscriptions.needReassignment();
-                    } else {
-                        // re-throw the exception as these should not happen
-                        log.error("Error committing partition {} at offset {}: {}",
-                                tp,
-                                offset,
-                                Errors.forCode(errorCode).exception().getMessage());
-                    }
-                }
+        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
+            this.response = clientResponse;
 
-                if (!future.isDone())
-                    future.complete(null);
+            if (clientResponse.wasDisconnected()) {
+                int correlation = response.request().request().header().correlationId();
+                log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
+                        response.request(),
+                        correlation,
+                        response.request().request().destination());
+
+                // mark the coordinator as dead
+                coordinatorDead();
+                future.raise(new DisconnectException());
+                return;
+            }
+
+            R response = parse(clientResponse);
+            handle(response, future);
+        }
+
+        @Override
+        public void onFailure(RuntimeException e, RequestFuture<T> future) {
+            if (e instanceof DisconnectException) {
+                log.debug("Coordinator request failed", e);
+                coordinatorDead();
             }
-            sensors.commitLatency.record(resp.requestLatencyMs());
+            future.raise(e);
         }
     }
 
+    public interface RebalanceCallback {
+        void onPartitionsAssigned(Collection<TopicPartition> partitions);
+        void onPartitionsRevoked(Collection<TopicPartition> partitions);
+    }
+
     private class CoordinatorMetrics {
         public final Metrics metrics;
         public final String metricGrpName;

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
new file mode 100644
index 0000000..61663f8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+
+public interface DelayedTask {
+
+    /**
+     * Execute the task.
+     * @param now current time in milliseconds
+     */
+    void run(long now);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
new file mode 100644
index 0000000..61cab20
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+/**
+ * Tracks a set of tasks to be executed after a delay.
+ */
+public class DelayedTaskQueue {
+
+    private PriorityQueue<Entry> tasks;
+
+    public DelayedTaskQueue() {
+        tasks = new PriorityQueue<Entry>();
+    }
+
+    /**
+     * Schedule a task for execution in the future.
+     *
+     * @param task the task to execute
+     * @param at the time at which to
+     */
+    public void add(DelayedTask task, long at) {
+        tasks.add(new Entry(task, at));
+    }
+
+    /**
+     * Remove a task from the queue if it is present
+     * @param task the task to be removed
+     * @returns true if a task was removed as a result of this call
+     */
+    public boolean remove(DelayedTask task) {
+        boolean wasRemoved = false;
+        Iterator<Entry> iterator = tasks.iterator();
+        while (iterator.hasNext()) {
+            Entry entry = iterator.next();
+            if (entry.task.equals(task)) {
+                iterator.remove();
+                wasRemoved = true;
+            }
+        }
+        return wasRemoved;
+    }
+
+    /**
+     * Get amount of time in milliseconds until the next event. Returns Long.MAX_VALUE if no tasks are scheduled.
+     *
+     * @return the remaining time in milliseconds
+     */
+    public long nextTimeout(long now) {
+        if (tasks.isEmpty())
+            return Long.MAX_VALUE;
+        else
+            return Math.max(tasks.peek().timeout - now, 0);
+    }
+
+    /**
+     * Run any ready tasks.
+     *
+     * @param now the current time
+     */
+    public void poll(long now) {
+        while (!tasks.isEmpty() && tasks.peek().timeout <= now) {
+            Entry entry = tasks.poll();
+            entry.task.run(now);
+        }
+    }
+
+    private static class Entry implements Comparable<Entry> {
+        DelayedTask task;
+        long timeout;
+
+        public Entry(DelayedTask task, long timeout) {
+            this.task = task;
+            this.timeout = timeout;
+        }
+
+        @Override
+        public int compareTo(Entry entry) {
+            return Long.compare(timeout, entry.timeout);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 695eaf6..d595c1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -13,17 +13,18 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
-import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -38,7 +39,6 @@ import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
-import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -52,21 +52,24 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 
 /**
  * This class manage the fetching process with the brokers.
  */
 public class Fetcher<K, V> {
+    private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
+    private static final long LATEST_OFFSET_TIMESTAMP = -1L;
 
     private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
 
-    private final KafkaClient client;
-
+    private final ConsumerNetworkClient client;
     private final Time time;
     private final int minBytes;
     private final int maxWaitMs;
     private final int fetchSize;
+    private final long retryBackoffMs;
     private final boolean checkCrcs;
     private final Metadata metadata;
     private final FetchManagerMetrics sensors;
@@ -75,8 +78,7 @@ public class Fetcher<K, V> {
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
 
-
-    public Fetcher(KafkaClient client,
+    public Fetcher(ConsumerNetworkClient client,
                    int minBytes,
                    int maxWaitMs,
                    int fetchSize,
@@ -88,7 +90,8 @@ public class Fetcher<K, V> {
                    Metrics metrics,
                    String metricGrpPrefix,
                    Map<String, String> metricTags,
-                   Time time) {
+                   Time time,
+                   long retryBackoffMs) {
 
         this.time = time;
         this.client = client;
@@ -105,25 +108,105 @@ public class Fetcher<K, V> {
         this.records = new LinkedList<PartitionRecords<K, V>>();
 
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
+        this.retryBackoffMs = retryBackoffMs;
     }
 
     /**
      * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
      *
      * @param cluster The current cluster metadata
-     * @param now The current time
      */
-    public void initFetches(Cluster cluster, long now) {
-        for (ClientRequest request : createFetchRequests(cluster)) {
-            Node node = cluster.nodeById(Integer.parseInt(request.request().destination()));
-            if (client.ready(node, now)) {
-                log.trace("Initiating fetch to node {}: {}", node.id(), request);
-                client.send(request);
+    public void initFetches(Cluster cluster) {
+        for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests(cluster).entrySet()) {
+            final FetchRequest fetch = fetchEntry.getValue();
+            client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
+                    .addListener(new RequestFutureListener<ClientResponse>() {
+                        @Override
+                        public void onSuccess(ClientResponse response) {
+                            handleFetchResponse(response, fetch);
+                        }
+
+                        @Override
+                        public void onFailure(RuntimeException e) {
+                            log.debug("Fetch failed", e);
+                        }
+                    });
+        }
+    }
+
+    /**
+     * Update the fetch positions for the provided partitions.
+     * @param partitions
+     */
+    public void updateFetchPositions(Set<TopicPartition> partitions) {
+        // reset the fetch position to the committed position
+        for (TopicPartition tp : partitions) {
+            // skip if we already have a fetch position
+            if (subscriptions.fetched(tp) != null)
+                continue;
+
+            // TODO: If there are several offsets to reset, we could submit offset requests in parallel
+            if (subscriptions.isOffsetResetNeeded(tp)) {
+                resetOffset(tp);
+            } else if (subscriptions.committed(tp) == null) {
+                // there's no committed position, so we need to reset with the default strategy
+                subscriptions.needOffsetReset(tp);
+                resetOffset(tp);
+            } else {
+                log.debug("Resetting offset for partition {} to the committed offset {}",
+                        tp, subscriptions.committed(tp));
+                subscriptions.seek(tp, subscriptions.committed(tp));
             }
         }
     }
 
     /**
+     * Reset offsets for the given partition using the offset reset strategy.
+     *
+     * @param partition The given partition that needs reset offset
+     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+     */
+    private void resetOffset(TopicPartition partition) {
+        OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
+        final long timestamp;
+        if (strategy == OffsetResetStrategy.EARLIEST)
+            timestamp = EARLIEST_OFFSET_TIMESTAMP;
+        else if (strategy == OffsetResetStrategy.LATEST)
+            timestamp = LATEST_OFFSET_TIMESTAMP;
+        else
+            throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
+
+        log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
+        long offset = listOffset(partition, timestamp);
+        this.subscriptions.seek(partition, offset);
+    }
+
+    /**
+     * Fetch a single offset before the given timestamp for the partition.
+     *
+     * @param partition The partition that needs fetching offset.
+     * @param timestamp The timestamp for fetching offset.
+     * @return The offset of the message that is published before the given timestamp
+     */
+    private long listOffset(TopicPartition partition, long timestamp) {
+        while (true) {
+            RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp);
+            client.poll(future);
+
+            if (future.succeeded())
+                return future.value();
+
+            if (!future.isRetriable())
+                throw future.exception();
+
+            if (future.exception() instanceof InvalidMetadataException)
+                client.awaitMetadataUpdate();
+            else
+                Utils.sleep(retryBackoffMs);
+        }
+    }
+
+    /**
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
      * @return The fetched records per partition
@@ -163,37 +246,27 @@ public class Fetcher<K, V> {
      * @param timestamp The timestamp for fetching offset.
      * @return A response which can be polled to obtain the corresponding offset.
      */
-    public RequestFuture<Long> listOffset(final TopicPartition topicPartition, long timestamp) {
+    private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
         Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
         partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
-        long now = time.milliseconds();
         PartitionInfo info = metadata.fetch().partition(topicPartition);
         if (info == null) {
             metadata.add(topicPartition.topic());
             log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
-            return RequestFuture.metadataRefreshNeeded();
+            return RequestFuture.staleMetadata();
         } else if (info.leader() == null) {
             log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
-            return RequestFuture.metadataRefreshNeeded();
-        } else if (this.client.ready(info.leader(), now)) {
-            final RequestFuture<Long> future = new RequestFuture<Long>();
+            return RequestFuture.leaderNotAvailable();
+        } else {
             Node node = info.leader();
             ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
-            RequestSend send = new RequestSend(node.idString(),
-                    this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
-                    request.toStruct());
-            RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
-                @Override
-                public void onComplete(ClientResponse resp) {
-                    handleListOffsetResponse(topicPartition, resp, future);
-                }
-            };
-            ClientRequest clientRequest = new ClientRequest(now, true, send, completionHandler);
-            this.client.send(clientRequest);
-            return future;
-        } else {
-            // We initiated a connect to the leader, but we need to poll to finish it.
-            return RequestFuture.pollNeeded();
+            return client.send(node, ApiKeys.LIST_OFFSETS, request)
+                    .compose(new RequestFutureAdapter<ClientResponse, Long>() {
+                        @Override
+                        public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
+                            handleListOffsetResponse(topicPartition, response, future);
+                        }
+                    });
         }
     }
 
@@ -206,7 +279,7 @@ public class Fetcher<K, V> {
                                           ClientResponse clientResponse,
                                           RequestFuture<Long> future) {
         if (clientResponse.wasDisconnected()) {
-            future.retryAfterMetadataRefresh();
+            future.raise(new DisconnectException());
         } else {
             ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
             short errorCode = lor.responseData().get(topicPartition).errorCode;
@@ -222,11 +295,11 @@ public class Fetcher<K, V> {
                     || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                 log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
                         topicPartition);
-                future.retryAfterMetadataRefresh();
+                future.raise(Errors.forCode(errorCode));
             } else {
                 log.error("Attempt to fetch offsets for partition {} failed due to: {}",
                         topicPartition, Errors.forCode(errorCode).exception().getMessage());
-                future.retryAfterMetadataRefresh();
+                future.raise(new StaleMetadataException());
             }
         }
     }
@@ -235,37 +308,31 @@ public class Fetcher<K, V> {
      * Create fetch requests for all nodes for which we have assigned partitions
      * that have no existing requests in flight.
      */
-    private List<ClientRequest> createFetchRequests(Cluster cluster) {
+    private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
         // create the fetch info
-        Map<Integer, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Integer, Map<TopicPartition, FetchRequest.PartitionData>>();
+        Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Node, Map<TopicPartition, FetchRequest.PartitionData>>();
         for (TopicPartition partition : subscriptions.assignedPartitions()) {
             Node node = cluster.leaderFor(partition);
             if (node == null) {
                 metadata.requestUpdate();
-            } else if (this.client.inFlightRequestCount(node.idString()) == 0) {
+            } else if (this.client.pendingRequestCount(node) == 0) {
                 // if there is a leader and no in-flight requests, issue a new fetch
-                Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node.id());
+                Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                 if (fetch == null) {
                     fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
-                    fetchable.put(node.id(), fetch);
+                    fetchable.put(node, fetch);
                 }
                 long offset = this.subscriptions.fetched(partition);
                 fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
             }
         }
 
-        // create the requests
-        List<ClientRequest> requests = new ArrayList<ClientRequest>(fetchable.size());
-        for (Map.Entry<Integer, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
-            int nodeId = entry.getKey();
-            final FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
-            RequestSend send = new RequestSend(Integer.toString(nodeId), this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct());
-            RequestCompletionHandler handler = new RequestCompletionHandler() {
-                public void onComplete(ClientResponse response) {
-                    handleFetchResponse(response, fetch);
-                }
-            };
-            requests.add(new ClientRequest(time.milliseconds(), true, send, handler));
+        // create the fetches
+        Map<Node, FetchRequest> requests = new HashMap<Node, FetchRequest>();
+        for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
+            Node node = entry.getKey();
+            FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
+            requests.put(node, fetch);
         }
         return requests;
     }
@@ -353,7 +420,6 @@ public class Fetcher<K, V> {
         }
     }
 
-
     private class FetchManagerMetrics {
         public final Metrics metrics;
         public final String metricGrpName;

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 51eae19..6da8936 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -13,7 +13,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 /**
- * A helper class for managing the heartbeat to the co-ordinator
+ * A helper class for managing the heartbeat to the coordinator
  */
 public final class Heartbeat {
     
@@ -25,18 +25,24 @@ public final class Heartbeat {
 
     private final long timeout;
     private long lastHeartbeatSend;
+    private long lastHeartbeatReceive;
+    private long lastSessionReset;
 
     public Heartbeat(long timeout, long now) {
         this.timeout = timeout;
-        this.lastHeartbeatSend = now;
+        this.lastSessionReset = now;
     }
 
     public void sentHeartbeat(long now) {
         this.lastHeartbeatSend = now;
     }
 
+    public void receiveHeartbeat(long now) {
+        this.lastHeartbeatReceive = now;
+    }
+
     public boolean shouldHeartbeat(long now) {
-        return now - lastHeartbeatSend > (1.0 / HEARTBEATS_PER_SESSION_INTERVAL) * this.timeout;
+        return timeToNextHeartbeat(now) == 0;
     }
     
     public long lastHeartbeatSend() {
@@ -44,7 +50,7 @@ public final class Heartbeat {
     }
 
     public long timeToNextHeartbeat(long now) {
-        long timeSinceLastHeartbeat = now - lastHeartbeatSend;
+        long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
 
         long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL;
         if (timeSinceLastHeartbeat > hbInterval)
@@ -52,4 +58,17 @@ public final class Heartbeat {
         else
             return hbInterval - timeSinceLastHeartbeat;
     }
+
+    public boolean sessionTimeoutExpired(long now) {
+        return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
+    }
+
+    public long interval() {
+        return timeout / HEARTBEATS_PER_SESSION_INTERVAL;
+    }
+
+    public void resetSessionTimeout(long now) {
+        this.lastSessionReset = now;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
new file mode 100644
index 0000000..0ec6017
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.InvalidMetadataException;
+
+/**
+ * No brokers were available to complete a request.
+ */
+public class NoAvailableBrokersException extends InvalidMetadataException {
+    private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 13fc9af..5f00251 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -12,78 +12,49 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.ArrayList;
+import java.util.List;
+
 /**
- * Result of an asynchronous request through {@link org.apache.kafka.clients.KafkaClient}. To get the
- * result of the request, you must use poll using {@link org.apache.kafka.clients.KafkaClient#poll(long, long)}
- * until {@link #isDone()} returns true. Typical usage might look like this:
+ * Result of an asynchronous request from {@link ConsumerNetworkClient}. Use {@link ConsumerNetworkClient#poll(long)}
+ * (and variants) to finish a request future. Use {@link #isDone()} to check if the future is complete, and
+ * {@link #succeeded()} to check if the request completed successfully. Typical usage might look like this:
  *
  * <pre>
- *     RequestFuture future = sendRequest();
- *     while (!future.isDone()) {
- *         client.poll(timeout, now);
- *     }
+ *     RequestFuture<ClientResponse> future = client.send(api, request);
+ *     client.poll(future);
  *
- *     switch (future.outcome()) {
- *     case SUCCESS:
- *         // handle request success
- *         break;
- *     case NEED_RETRY:
- *         // retry after taking possible retry action
- *         break;
- *     case EXCEPTION:
- *         // handle exception
-  *     }
+ *     if (future.succeeded()) {
+ *         ClientResponse response = future.value();
+ *         // Handle response
+ *     } else {
+ *         throw future.exception();
+ *     }
  * </pre>
  *
- * When {@link #isDone()} returns true, there are three possible outcomes (obtained through {@link #outcome()}):
- *
- * <ol>
- * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#SUCCESS}: If the request was
- *    successful, then you can use {@link #value()} to obtain the result.</li>
- * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#EXCEPTION}: If an unhandled exception
- *    was encountered, you can use {@link #exception()} to get it.</li>
- * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#NEED_RETRY}: The request may
- *    not have been successful, but the failure may be ephemeral and the caller just needs to try the request again.
- *    In this case, use {@link #retryAction()} to determine what action should be taken (if any) before
- *    retrying.</li>
- * </ol>
- *
  * @param <T> Return type of the result (Can be Void if there is no response)
  */
 public class RequestFuture<T> {
-    public static final RequestFuture<Object> NEED_NEW_COORDINATOR = newRetryFuture(RetryAction.FIND_COORDINATOR);
-    public static final RequestFuture<Object> NEED_POLL = newRetryFuture(RetryAction.POLL);
-    public static final RequestFuture<Object> NEED_METADATA_REFRESH = newRetryFuture(RetryAction.REFRESH_METADATA);
-
-    public enum RetryAction {
-        NOOP,             // Retry immediately.
-        POLL,             // Retry after calling poll (e.g. to finish a connection)
-        BACKOFF,          // Retry after a delay
-        FIND_COORDINATOR, // Find a new coordinator before retrying
-        REFRESH_METADATA  // Refresh metadata before retrying
-    }
-
-    public enum Outcome {
-        SUCCESS,
-        NEED_RETRY,
-        EXCEPTION
-    }
 
-    private Outcome outcome;
-    private RetryAction retryAction;
+    private boolean isDone = false;
     private T value;
     private RuntimeException exception;
+    private List<RequestFutureListener<T>> listeners = new ArrayList<RequestFutureListener<T>>();
+
 
     /**
      * Check whether the response is ready to be handled
      * @return true if the response is ready, false otherwise
      */
     public boolean isDone() {
-        return outcome != null;
+        return isDone;
     }
 
     /**
-     * Get the value corresponding to this request (if it has one, as indicated by {@link #outcome()}).
+     * Get the value corresponding to this request (only available if the request succeeded)
      * @return the value if it exists or null
      */
     public T value() {
@@ -92,32 +63,31 @@ public class RequestFuture<T> {
 
     /**
      * Check if the request succeeded;
-     * @return true if a value is available, false otherwise
+     * @return true if the request completed and was successful
      */
     public boolean succeeded() {
-        return outcome == Outcome.SUCCESS;
+        return isDone && exception == null;
     }
 
     /**
-     * Check if the request completed failed.
-     * @return true if the request failed (whether or not it can be retried)
+     * Check if the request failed.
+     * @return true if the request completed with a failure
      */
     public boolean failed() {
-        return outcome != Outcome.SUCCESS;
+        return isDone && exception != null;
     }
 
     /**
-     * Return the error from this response (assuming {@link #succeeded()} has returned false. If the
-     * response is not ready or if there is no retryAction, null is returned.
-     * @return the error if it exists or null
+     * Check if the request is retriable (convenience method for checking if
+     * the exception is an instance of {@link RetriableException}.
+     * @return true if it is retriable, false otherwise
      */
-    public RetryAction retryAction() {
-        return retryAction;
+    public boolean isRetriable() {
+        return exception instanceof RetriableException;
     }
 
     /**
-     * Get the exception from a failed result. You should check that there is an exception
-     * with {@link #hasException()} before using this method.
+     * Get the exception from a failed result (only available if the request failed)
      * @return The exception if it exists or null
      */
     public RuntimeException exception() {
@@ -125,85 +95,108 @@ public class RequestFuture<T> {
     }
 
     /**
-     * Check whether there was an exception.
-     * @return true if this request failed with an exception
+     * Complete the request successfully. After this call, {@link #succeeded()} will return true
+     * and the value can be obtained through {@link #value()}.
+     * @param value corresponding value (or null if there is none)
      */
-    public boolean hasException() {
-        return outcome == Outcome.EXCEPTION;
+    public void complete(T value) {
+        this.value = value;
+        this.isDone = true;
+        fireSuccess();
     }
 
     /**
-     * Check the outcome of the future if it is ready.
-     * @return the outcome or null if the future is not finished
+     * Raise an exception. The request will be marked as failed, and the caller can either
+     * handle the exception or throw it.
+     * @param e corresponding exception to be passed to caller
      */
-    public Outcome outcome() {
-        return outcome;
+    public void raise(RuntimeException e) {
+        this.exception = e;
+        this.isDone = true;
+        fireFailure();
     }
 
     /**
-     * The request failed, but should be retried using the provided retry action.
-     * @param retryAction The action that should be taken by the caller before retrying the request
+     * Raise an error. The request will be marked as failed.
+     * @param error corresponding error to be passed to caller
      */
-    public void retry(RetryAction retryAction) {
-        this.outcome = Outcome.NEED_RETRY;
-        this.retryAction = retryAction;
-    }
-
-    public void retryNow() {
-        retry(RetryAction.NOOP);
-    }
-
-    public void retryAfterBackoff() {
-        retry(RetryAction.BACKOFF);
+    public void raise(Errors error) {
+        raise(error.exception());
     }
 
-    public void retryWithNewCoordinator() {
-        retry(RetryAction.FIND_COORDINATOR);
+    private void fireSuccess() {
+        for (RequestFutureListener listener: listeners)
+            listener.onSuccess(value);
     }
 
-    public void retryAfterMetadataRefresh() {
-        retry(RetryAction.REFRESH_METADATA);
+    private void fireFailure() {
+        for (RequestFutureListener listener: listeners)
+            listener.onFailure(exception);
     }
 
     /**
-     * Complete the request successfully. After this call, {@link #succeeded()} will return true
-     * and the value can be obtained through {@link #value()}.
-     * @param value corresponding value (or null if there is none)
+     * Add a listener which will be notified when the future completes
+     * @param listener
      */
-    public void complete(T value) {
-        this.outcome = Outcome.SUCCESS;
-        this.value = value;
+    public void addListener(RequestFutureListener<T> listener) {
+        if (isDone) {
+            if (exception != null)
+                listener.onFailure(exception);
+            else
+                listener.onSuccess(value);
+        } else {
+            this.listeners.add(listener);
+        }
     }
 
     /**
-     * Raise an exception. The request will be marked as failed, and the caller can either
-     * handle the exception or throw it.
-     * @param e The exception that
+     * Convert from a request future of one type to another type
+     * @param adapter The adapter which does the conversion
+     * @param <S> The type of the future adapted to
+     * @return The new future
      */
-    public void raise(RuntimeException e) {
-        this.outcome = Outcome.EXCEPTION;
-        this.exception = e;
+    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
+        final RequestFuture<S> adapted = new RequestFuture<S>();
+        addListener(new RequestFutureListener<T>() {
+            @Override
+            public void onSuccess(T value) {
+                adapter.onSuccess(value, adapted);
+            }
+
+            @Override
+            public void onFailure(RuntimeException e) {
+                adapter.onFailure(e, adapted);
+            }
+        });
+        return adapted;
+    }
+
+    public static <T> RequestFuture<T> failure(RuntimeException e) {
+        RequestFuture<T> future = new RequestFuture<T>();
+        future.raise(e);
+        return future;
+    }
+
+    public static RequestFuture<Void> voidSuccess() {
+        RequestFuture<Void> future = new RequestFuture<Void>();
+        future.complete(null);
+        return future;
     }
 
-    private static <T> RequestFuture<T> newRetryFuture(RetryAction retryAction) {
-        RequestFuture<T> result = new RequestFuture<T>();
-        result.retry(retryAction);
-        return result;
+    public static <T> RequestFuture<T> coordinatorNotAvailable() {
+        return failure(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
     }
 
-    @SuppressWarnings("unchecked")
-    public static <T> RequestFuture<T> pollNeeded() {
-        return (RequestFuture<T>) NEED_POLL;
+    public static <T> RequestFuture<T> leaderNotAvailable() {
+        return failure(Errors.LEADER_NOT_AVAILABLE.exception());
     }
 
-    @SuppressWarnings("unchecked")
-    public static <T> RequestFuture<T> metadataRefreshNeeded() {
-        return (RequestFuture<T>) NEED_METADATA_REFRESH;
+    public static <T> RequestFuture<T> noBrokersAvailable() {
+        return failure(new NoAvailableBrokersException());
     }
 
-    @SuppressWarnings("unchecked")
-    public static <T> RequestFuture<T> newCoordinatorNeeded() {
-        return (RequestFuture<T>) NEED_NEW_COORDINATOR;
+    public static <T> RequestFuture<T> staleMetadata() {
+        return failure(new StaleMetadataException());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
new file mode 100644
index 0000000..cc5322f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Adapt from a request future of one type to another.
+ *
+ * @param <F> Type to adapt from
+ * @param <T> Type to adapt to
+ */
+public abstract class RequestFutureAdapter<F, T> {
+
+    public abstract void onSuccess(F value, RequestFuture<T> future);
+
+    public void onFailure(RuntimeException e, RequestFuture<T> future) {
+        future.raise(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
new file mode 100644
index 0000000..b39261b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Listener interface to hook into RequestFuture completion.
+ */
+public interface RequestFutureListener<T> {
+
+    void onSuccess(T value);
+
+    void onFailure(RuntimeException e);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
new file mode 100644
index 0000000..3312a2c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.RetriableException;
+
+/**
+ * Exception used in {@link ConsumerNetworkClient} to indicate the failure
+ * to transmit a request to the networking layer. This could be either because
+ * the client is still connecting to the given host or its send buffer is full.
+ */
+public class SendFailedException extends RetriableException {
+    public static final SendFailedException INSTANCE = new SendFailedException();
+
+    private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
new file mode 100644
index 0000000..09114cb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.InvalidMetadataException;
+
+/**
+ * Thrown when metadata is old and needs to be refreshed.
+ */
+public class StaleMetadataException extends InvalidMetadataException {
+    private static final long serialVersionUID = 1L;
+}


Mime
View raw message