kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/4] kafka git commit: KAFKA-3888: send consumer heartbeats from a background thread (KIP-62)
Date Wed, 17 Aug 2016 18:50:10 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index b65a5b7..07edd3c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
@@ -36,27 +37,34 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Higher level consumer access to the network layer with basic support for futures and
- * task scheduling. This class is not thread-safe, except for wakeup().
+ * Higher level consumer access to the network layer with basic support for request futures. This class
+ * is thread-safe, but provides no synchronization for response callbacks. This guarantees that no locks
+ * are held when they are invoked.
  */
 public class ConsumerNetworkClient implements Closeable {
     private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
 
+    // the mutable state of this class is protected by the object's monitor (excluding the wakeup
+    // flag and the request completion queue below).
     private final KafkaClient client;
-    private final AtomicBoolean wakeup = new AtomicBoolean(false);
-    private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
     private final Map<Node, List<ClientRequest>> unsent = new HashMap<>();
     private final Metadata metadata;
     private final Time time;
     private final long retryBackoffMs;
     private final long unsentExpiryMs;
-
-    // this count is only accessed from the consumer's main thread
     private int wakeupDisabledCount = 0;
 
+    // when requests complete, they are transferred to this queue prior to invocation. The purpose
+    // is to avoid invoking them while holding the lock above.
+    private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue<>();
+
+    // this flag allows the client to be safely woken up without waiting on the lock above. It is
+    // atomic to avoid the need to acquire the lock above in order to enable it concurrently.
+    private final AtomicBoolean wakeup = new AtomicBoolean(false);
 
     public ConsumerNetworkClient(KafkaClient client,
                                  Metadata metadata,
@@ -71,25 +79,6 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     /**
-     * Schedule a new task to be executed at the given time. This is "best-effort" scheduling and
-     * should only be used for coarse synchronization.
-     * @param task The task to be scheduled
-     * @param at The time it should run
-     */
-    public void schedule(DelayedTask task, long at) {
-        delayedTasks.add(task, at);
-    }
-
-    /**
-     * Unschedule a task. This will remove all instances of the task from the task queue.
-     * This is a no-op if the task is not scheduled.
-     * @param task The task to be unscheduled.
-     */
-    public void unschedule(DelayedTask task) {
-        delayedTasks.remove(task);
-    }
-
-    /**
      * Send a new request. Note that the request is not actually transmitted on the
      * network until one of the {@link #poll(long)} variants is invoked. At this
      * point the request will either be transmitted successfully or will fail.
@@ -104,25 +93,36 @@ public class ConsumerNetworkClient implements Closeable {
     public RequestFuture<ClientResponse> send(Node node,
                                               ApiKeys api,
                                               AbstractRequest request) {
+        return send(node, api, ProtoUtils.latestVersion(api.id), request);
+    }
+
+    private RequestFuture<ClientResponse> send(Node node,
+                                              ApiKeys api,
+                                              short version,
+                                              AbstractRequest request) {
         long now = time.milliseconds();
-        RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
-        RequestHeader header = client.nextRequestHeader(api);
+        RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
+        RequestHeader header = client.nextRequestHeader(api, version);
         RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
-        put(node, new ClientRequest(now, true, send, future));
-        return future;
+        put(node, new ClientRequest(now, true, send, completionHandler));
+        return completionHandler.future;
     }
 
     private void put(Node node, ClientRequest request) {
-        List<ClientRequest> nodeUnsent = unsent.get(node);
-        if (nodeUnsent == null) {
-            nodeUnsent = new ArrayList<>();
-            unsent.put(node, nodeUnsent);
+        synchronized (this) {
+            List<ClientRequest> nodeUnsent = unsent.get(node);
+            if (nodeUnsent == null) {
+                nodeUnsent = new ArrayList<>();
+                unsent.put(node, nodeUnsent);
+            }
+            nodeUnsent.add(request);
         }
-        nodeUnsent.add(request);
     }
 
     public Node leastLoadedNode() {
-        return client.leastLoadedNode(time.milliseconds());
+        synchronized (this) {
+            return client.leastLoadedNode(time.milliseconds());
+        }
     }
 
     /**
@@ -149,6 +149,8 @@ public class ConsumerNetworkClient implements Closeable {
      * on the current poll if one is active, or the next poll.
      */
     public void wakeup() {
+        // wakeup should be safe without holding the client lock since it simply delegates to
+        // Selector's wakeup, which is threadsafe
         this.wakeup.set(true);
         this.client.wakeup();
     }
@@ -175,7 +177,7 @@ public class ConsumerNetworkClient implements Closeable {
         long remaining = timeout;
         long now = begin;
         do {
-            poll(remaining, now, true);
+            poll(remaining, now);
             now = time.milliseconds();
             long elapsed = now - begin;
             remaining = timeout - elapsed;
@@ -189,7 +191,7 @@ public class ConsumerNetworkClient implements Closeable {
      * @throws WakeupException if {@link #wakeup()} is called from another thread
      */
     public void poll(long timeout) {
-        poll(timeout, time.milliseconds(), true);
+        poll(timeout, time.milliseconds());
     }
 
     /**
@@ -198,7 +200,37 @@ public class ConsumerNetworkClient implements Closeable {
      * @param now current time in milliseconds
      */
     public void poll(long timeout, long now) {
-        poll(timeout, now, true);
+        // there may be handlers which need to be invoked if we woke up the previous call to poll
+        firePendingCompletedRequests();
+
+        synchronized (this) {
+            // send all the requests we can send now
+            trySend(now);
+
+            // ensure we don't poll any longer than the deadline for
+            // the next scheduled task
+            client.poll(timeout, now);
+            now = time.milliseconds();
+
+            // handle any disconnects by failing the active requests. note that disconnects must
+            // be checked immediately following poll since any subsequent call to client.ready()
+            // will reset the disconnect status
+            checkDisconnects(now);
+
+            // trigger wakeups after checking for disconnects so that the callbacks will be ready
+            // to be fired on the next call to poll()
+            maybeTriggerWakeup();
+
+            // try again to send requests since buffer space may have been
+            // cleared or a connect finished in the poll
+            trySend(now);
+
+            // fail requests that couldn't be sent if they have expired
+            failExpiredRequests(now);
+        }
+
+        // called without the lock to avoid deadlock potential if handlers need to acquire locks
+        firePendingCompletedRequests();
     }
 
     /**
@@ -208,49 +240,12 @@ public class ConsumerNetworkClient implements Closeable {
     public void pollNoWakeup() {
         disableWakeups();
         try {
-            poll(0, time.milliseconds(), false);
+            poll(0, time.milliseconds());
         } finally {
             enableWakeups();
         }
     }
 
-    private void poll(long timeout, long now, boolean executeDelayedTasks) {
-        // send all the requests we can send now
-        trySend(now);
-
-        // ensure we don't poll any longer than the deadline for
-        // the next scheduled task
-        timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
-        clientPoll(timeout, now);
-        now = time.milliseconds();
-
-        // handle any disconnects by failing the active requests. note that disconnects must
-        // be checked immediately following poll since any subsequent call to client.ready()
-        // will reset the disconnect status
-        checkDisconnects(now);
-
-        // execute scheduled tasks
-        if (executeDelayedTasks)
-            delayedTasks.poll(now);
-
-        // try again to send requests since buffer space may have been
-        // cleared or a connect finished in the poll
-        trySend(now);
-
-        // fail requests that couldn't be sent if they have expired
-        failExpiredRequests(now);
-    }
-
-    /**
-     * Execute delayed tasks now.
-     * @param now current time in milliseconds
-     * @throws WakeupException if a wakeup has been requested
-     */
-    public void executeDelayedTasks(long now) {
-        delayedTasks.poll(now);
-        maybeTriggerWakeup();
-    }
-
     /**
      * Block until all pending requests from the given node have finished.
      * @param node The node to await requests from
@@ -267,9 +262,11 @@ public class ConsumerNetworkClient implements Closeable {
      * @return The number of pending requests
      */
     public int pendingRequestCount(Node node) {
-        List<ClientRequest> pending = unsent.get(node);
-        int unsentCount = pending == null ? 0 : pending.size();
-        return unsentCount + client.inFlightRequestCount(node.idString());
+        synchronized (this) {
+            List<ClientRequest> pending = unsent.get(node);
+            int unsentCount = pending == null ? 0 : pending.size();
+            return unsentCount + client.inFlightRequestCount(node.idString());
+        }
     }
 
     /**
@@ -278,10 +275,22 @@ public class ConsumerNetworkClient implements Closeable {
      * @return The total count of pending requests
      */
     public int pendingRequestCount() {
-        int total = 0;
-        for (List<ClientRequest> requests: unsent.values())
-            total += requests.size();
-        return total + client.inFlightRequestCount();
+        synchronized (this) {
+            int total = 0;
+            for (List<ClientRequest> requests: unsent.values())
+                total += requests.size();
+            return total + client.inFlightRequestCount();
+        }
+    }
+
+    private void firePendingCompletedRequests() {
+        for (;;) {
+            RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
+            if (completionHandler == null)
+                break;
+
+            completionHandler.fireCompletion();
+        }
     }
 
     private void checkDisconnects(long now) {
@@ -315,9 +324,8 @@ public class ConsumerNetworkClient implements Closeable {
             while (requestIterator.hasNext()) {
                 ClientRequest request = requestIterator.next();
                 if (request.createdTimeMs() < now - unsentExpiryMs) {
-                    RequestFutureCompletionHandler handler =
-                            (RequestFutureCompletionHandler) request.callback();
-                    handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
+                    RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+                    handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
                     requestIterator.remove();
                 } else
                     break;
@@ -327,15 +335,20 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
-    protected void failUnsentRequests(Node node, RuntimeException e) {
+    public void failUnsentRequests(Node node, RuntimeException e) {
         // clear unsent requests to node and fail their corresponding futures
-        List<ClientRequest> unsentRequests = unsent.remove(node);
-        if (unsentRequests != null) {
-            for (ClientRequest request : unsentRequests) {
-                RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
-                handler.raise(e);
+        synchronized (this) {
+            List<ClientRequest> unsentRequests = unsent.remove(node);
+            if (unsentRequests != null) {
+                for (ClientRequest request : unsentRequests) {
+                    RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+                    handler.onFailure(e);
+                }
             }
         }
+
+        // called without the lock to avoid deadlock potential
+        firePendingCompletedRequests();
     }
 
     private boolean trySend(long now) {
@@ -356,11 +369,6 @@ public class ConsumerNetworkClient implements Closeable {
         return requestsSent;
     }
 
-    private void clientPoll(long timeout, long now) {
-        client.poll(timeout, now);
-        maybeTriggerWakeup();
-    }
-
     private void maybeTriggerWakeup() {
         if (wakeupDisabledCount == 0 && wakeup.get()) {
             wakeup.set(false);
@@ -369,24 +377,30 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     public void disableWakeups() {
-        wakeupDisabledCount++;
+        synchronized (this) {
+            wakeupDisabledCount++;
+        }
     }
 
     public void enableWakeups() {
-        if (wakeupDisabledCount <= 0)
-            throw new IllegalStateException("Cannot enable wakeups since they were never disabled");
+        synchronized (this) {
+            if (wakeupDisabledCount <= 0)
+                throw new IllegalStateException("Cannot enable wakeups since they were never disabled");
 
-        wakeupDisabledCount--;
+            wakeupDisabledCount--;
 
-        // re-wakeup the client if the flag was set since previous wake-up call
-        // could be cleared by poll(0) while wakeups were disabled
-        if (wakeupDisabledCount == 0 && wakeup.get())
-            this.client.wakeup();
+            // re-wakeup the client if the flag was set since previous wake-up call
+            // could be cleared by poll(0) while wakeups were disabled
+            if (wakeupDisabledCount == 0 && wakeup.get())
+                this.client.wakeup();
+        }
     }
 
     @Override
     public void close() throws IOException {
-        client.close();
+        synchronized (this) {
+            client.close();
+        }
     }
 
     /**
@@ -395,7 +409,9 @@ public class ConsumerNetworkClient implements Closeable {
      * @param node Node to connect to if possible
      */
     public boolean connectionFailed(Node node) {
-        return client.connectionFailed(node);
+        synchronized (this) {
+            return client.connectionFailed(node);
+        }
     }
 
     /**
@@ -405,26 +421,45 @@ public class ConsumerNetworkClient implements Closeable {
      * @param node The node to connect to
      */
     public void tryConnect(Node node) {
-        client.ready(node, time.milliseconds());
+        synchronized (this) {
+            client.ready(node, time.milliseconds());
+        }
     }
 
-    public static class RequestFutureCompletionHandler
-            extends RequestFuture<ClientResponse>
-            implements RequestCompletionHandler {
+    public class RequestFutureCompletionHandler implements RequestCompletionHandler {
+        private final RequestFuture<ClientResponse> future;
+        private ClientResponse response;
+        private RuntimeException e;
 
-        @Override
-        public void onComplete(ClientResponse response) {
-            if (response.wasDisconnected()) {
+        public RequestFutureCompletionHandler() {
+            this.future = new RequestFuture<>();
+        }
+
+        public void fireCompletion() {
+            if (e != null) {
+                future.raise(e);
+            } else if (response.wasDisconnected()) {
                 ClientRequest request = response.request();
                 RequestSend send = request.request();
                 ApiKeys api = ApiKeys.forId(send.header().apiKey());
                 int correlation = send.header().correlationId();
                 log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                         api, request, correlation, send.destination());
-                raise(DisconnectException.INSTANCE);
+                future.raise(DisconnectException.INSTANCE);
             } else {
-                complete(response);
+                future.complete(response);
             }
         }
+
+        public void onFailure(RuntimeException e) {
+            this.e = e;
+            pendingCompletion.add(this);
+        }
+
+        @Override
+        public void onComplete(ClientResponse response) {
+            this.response = response;
+            pendingCompletion.add(this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
deleted file mode 100644
index 61663f8..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer.internals;
-
-
-public interface DelayedTask {
-
-    /**
-     * Execute the task.
-     * @param now current time in milliseconds
-     */
-    void run(long now);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
deleted file mode 100644
index 61cab20..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer.internals;
-
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
-/**
- * Tracks a set of tasks to be executed after a delay.
- */
-public class DelayedTaskQueue {
-
-    private PriorityQueue<Entry> tasks;
-
-    public DelayedTaskQueue() {
-        tasks = new PriorityQueue<Entry>();
-    }
-
-    /**
-     * Schedule a task for execution in the future.
-     *
-     * @param task the task to execute
-     * @param at the time at which to
-     */
-    public void add(DelayedTask task, long at) {
-        tasks.add(new Entry(task, at));
-    }
-
-    /**
-     * Remove a task from the queue if it is present
-     * @param task the task to be removed
-     * @returns true if a task was removed as a result of this call
-     */
-    public boolean remove(DelayedTask task) {
-        boolean wasRemoved = false;
-        Iterator<Entry> iterator = tasks.iterator();
-        while (iterator.hasNext()) {
-            Entry entry = iterator.next();
-            if (entry.task.equals(task)) {
-                iterator.remove();
-                wasRemoved = true;
-            }
-        }
-        return wasRemoved;
-    }
-
-    /**
-     * Get amount of time in milliseconds until the next event. Returns Long.MAX_VALUE if no tasks are scheduled.
-     *
-     * @return the remaining time in milliseconds
-     */
-    public long nextTimeout(long now) {
-        if (tasks.isEmpty())
-            return Long.MAX_VALUE;
-        else
-            return Math.max(tasks.peek().timeout - now, 0);
-    }
-
-    /**
-     * Run any ready tasks.
-     *
-     * @param now the current time
-     */
-    public void poll(long now) {
-        while (!tasks.isEmpty() && tasks.peek().timeout <= now) {
-            Entry entry = tasks.poll();
-            entry.task.run(now);
-        }
-    }
-
-    private static class Entry implements Comparable<Entry> {
-        DelayedTask task;
-        long timeout;
-
-        public Entry(DelayedTask task, long timeout) {
-            this.task = task;
-            this.timeout = timeout;
-        }
-
-        @Override
-        public int compareTo(Entry entry) {
-            return Long.compare(timeout, entry.timeout);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 913ce9e..84278c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -65,6 +65,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * This class manage the fetching process with the brokers.
@@ -84,7 +85,7 @@ public class Fetcher<K, V> {
     private final Metadata metadata;
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
-    private final List<CompletedFetch> completedFetches;
+    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
 
@@ -115,7 +116,7 @@ public class Fetcher<K, V> {
         this.checkCrcs = checkCrcs;
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
-        this.completedFetches = new ArrayList<>();
+        this.completedFetches = new ConcurrentLinkedQueue<>();
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
     }
@@ -127,7 +128,8 @@ public class Fetcher<K, V> {
     public void sendFetches() {
         for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
             final FetchRequest request = fetchEntry.getValue();
-            client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
+            final Node fetchTarget = fetchEntry.getKey();
+            client.send(fetchTarget, ApiKeys.FETCH, request)
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
                         public void onSuccess(ClientResponse resp) {
@@ -148,7 +150,7 @@ public class Fetcher<K, V> {
 
                         @Override
                         public void onFailure(RuntimeException e) {
-                            log.debug("Fetch failed", e);
+                            log.debug("Fetch request to {} failed", fetchTarget, e);
                         }
                     });
         }
@@ -353,16 +355,14 @@ public class Fetcher<K, V> {
         } else {
             Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
             int recordsRemaining = maxPollRecords;
-            Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator();
 
             while (recordsRemaining > 0) {
                 if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
-                    if (!completedFetchesIterator.hasNext())
+                    CompletedFetch completedFetch = completedFetches.poll();
+                    if (completedFetch == null)
                         break;
 
-                    CompletedFetch completion = completedFetchesIterator.next();
-                    completedFetchesIterator.remove();
-                    nextInLineRecords = parseFetchedData(completion);
+                    nextInLineRecords = parseFetchedData(completedFetch);
                 } else {
                     recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
                 }
@@ -510,6 +510,8 @@ public class Fetcher<K, V> {
                 long position = this.subscriptions.position(partition);
                 fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
                 log.trace("Added fetch request for partition {} at offset {}", partition, position);
+            } else {
+                log.trace("Skipping fetch for partition {} because there is an inflight request to {}", partition, node);
             }
         }
 
@@ -845,4 +847,5 @@ public class Fetcher<K, V> {
             recordsFetched.record(records);
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 79e17e2..dff1006 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -16,26 +16,41 @@ package org.apache.kafka.clients.consumer.internals;
  * A helper class for managing the heartbeat to the coordinator
  */
 public final class Heartbeat {
-    private final long timeout;
-    private final long interval;
+    private final long sessionTimeout;
+    private final long heartbeatInterval;
+    private final long maxPollInterval;
+    private final long retryBackoffMs;
 
-    private long lastHeartbeatSend;
+    private volatile long lastHeartbeatSend; // volatile since it is read by metrics
     private long lastHeartbeatReceive;
     private long lastSessionReset;
+    private long lastPoll;
+    private boolean heartbeatFailed;
 
-    public Heartbeat(long timeout,
-                     long interval,
-                     long now) {
-        if (interval >= timeout)
+    public Heartbeat(long sessionTimeout,
+                     long heartbeatInterval,
+                     long maxPollInterval,
+                     long retryBackoffMs) {
+        if (heartbeatInterval >= sessionTimeout)
             throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
 
-        this.timeout = timeout;
-        this.interval = interval;
-        this.lastSessionReset = now;
+        this.sessionTimeout = sessionTimeout;
+        this.heartbeatInterval = heartbeatInterval;
+        this.maxPollInterval = maxPollInterval;
+        this.retryBackoffMs = retryBackoffMs;
+    }
+
+    public void poll(long now) {
+        this.lastPoll = now;
     }
 
     public void sentHeartbeat(long now) {
         this.lastHeartbeatSend = now;
+        this.heartbeatFailed = false;
+    }
+
+    public void failHeartbeat() {
+        this.heartbeatFailed = true;
     }
 
     public void receiveHeartbeat(long now) {
@@ -52,23 +67,34 @@ public final class Heartbeat {
 
     public long timeToNextHeartbeat(long now) {
         long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
+        final long delayToNextHeartbeat;
+        if (heartbeatFailed)
+            delayToNextHeartbeat = retryBackoffMs;
+        else
+            delayToNextHeartbeat = heartbeatInterval;
 
-        if (timeSinceLastHeartbeat > interval)
+        if (timeSinceLastHeartbeat > delayToNextHeartbeat)
             return 0;
         else
-            return interval - timeSinceLastHeartbeat;
+            return delayToNextHeartbeat - timeSinceLastHeartbeat;
     }
 
     public boolean sessionTimeoutExpired(long now) {
-        return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
+        return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
     }
 
     public long interval() {
-        return interval;
+        return heartbeatInterval;
     }
 
-    public void resetSessionTimeout(long now) {
+    public void resetTimeouts(long now) {
         this.lastSessionReset = now;
+        this.lastPoll = now;
+        this.heartbeatFailed = false;
+    }
+
+    public boolean pollTimeoutExpired(long now) {
+        return now - lastPoll > maxPollInterval;
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 71c16fa..b21d13e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -196,7 +196,7 @@ public class RequestFuture<T> {
     }
 
     public static RequestFuture<Void> voidSuccess() {
-        RequestFuture<Void> future = new RequestFuture<Void>();
+        RequestFuture<Void> future = new RequestFuture<>();
         future.complete(null);
         return future;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index d27ec8a..313477f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -572,9 +572,28 @@ public class Protocol {
                                                                             new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
                                                                             "List of protocols that the member supports"));
 
+    public static final Schema JOIN_GROUP_REQUEST_V1 = new Schema(new Field("group_id",
+                                                                            STRING,
+                                                                            "The group id."),
+                                                                  new Field("session_timeout",
+                                                                            INT32,
+                                                                            "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
+                                                                  new Field("rebalance_timeout",
+                                                                            INT32,
+                                                                            "The maximum time that the coordinator will wait for each member to rejoin when rebalancing the group"),
+                                                                  new Field("member_id",
+                                                                            STRING,
+                                                                            "The assigned consumer id or an empty string for a new consumer."),
+                                                                  new Field("protocol_type",
+                                                                            STRING,
+                                                                            "Unique name for class of protocols implemented by group"),
+                                                                  new Field("group_protocols",
+                                                                            new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0),
+                                                                            "List of protocols that the member supports"));
 
     public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING),
                                                                           new Field("member_metadata", BYTES));
+
     public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
                                                                    new Field("generation_id",
                                                                              INT32,
@@ -591,8 +610,10 @@ public class Protocol {
                                                                    new Field("members",
                                                                              new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
 
-    public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0};
-    public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0};
+    public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
+
+    public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1};
+    public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1};
 
     /* SyncGroup api */
     public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING),

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 14a6c1d..2845ee0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -24,10 +24,11 @@ import java.util.Collections;
 import java.util.List;
 
 public class JoinGroupRequest extends AbstractRequest {
-    
+
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
+    private static final String REBALANCE_TIMEOUT_KEY_NAME = "rebalance_timeout";
     private static final String MEMBER_ID_KEY_NAME = "member_id";
     private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
     private static final String GROUP_PROTOCOLS_KEY_NAME = "group_protocols";
@@ -38,6 +39,7 @@ public class JoinGroupRequest extends AbstractRequest {
 
     private final String groupId;
     private final int sessionTimeout;
+    private final int rebalanceTimeout;
     private final String memberId;
     private final String protocolType;
     private final List<ProtocolMetadata> groupProtocols;
@@ -60,14 +62,40 @@ public class JoinGroupRequest extends AbstractRequest {
         }
     }
 
+    // v0 constructor
+    @Deprecated
+    public JoinGroupRequest(String groupId,
+                            int sessionTimeout,
+                            String memberId,
+                            String protocolType,
+                            List<ProtocolMetadata> groupProtocols) {
+        this(0, groupId, sessionTimeout, sessionTimeout, memberId, protocolType, groupProtocols);
+    }
+
     public JoinGroupRequest(String groupId,
                             int sessionTimeout,
+                            int rebalanceTimeout,
                             String memberId,
                             String protocolType,
                             List<ProtocolMetadata> groupProtocols) {
-        super(new Struct(CURRENT_SCHEMA));
+        this(1, groupId, sessionTimeout, rebalanceTimeout, memberId, protocolType, groupProtocols);
+    }
+
+    private JoinGroupRequest(int version,
+                             String groupId,
+                             int sessionTimeout,
+                             int rebalanceTimeout,
+                             String memberId,
+                             String protocolType,
+                             List<ProtocolMetadata> groupProtocols) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.JOIN_GROUP.id, version)));
+
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
+
+        if (version >= 1)
+            struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout);
+
         struct.set(MEMBER_ID_KEY_NAME, memberId);
         struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
 
@@ -82,6 +110,7 @@ public class JoinGroupRequest extends AbstractRequest {
         struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray());
         this.groupId = groupId;
         this.sessionTimeout = sessionTimeout;
+        this.rebalanceTimeout = rebalanceTimeout;
         this.memberId = memberId;
         this.protocolType = protocolType;
         this.groupProtocols = groupProtocols;
@@ -89,8 +118,17 @@ public class JoinGroupRequest extends AbstractRequest {
 
     public JoinGroupRequest(Struct struct) {
         super(struct);
+
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
+
+        if (struct.hasField(REBALANCE_TIMEOUT_KEY_NAME))
+            // rebalance timeout is added in v1
+            rebalanceTimeout = struct.getInt(REBALANCE_TIMEOUT_KEY_NAME);
+        else
+            // v0 had no rebalance timeout but used session timeout implicitly
+            rebalanceTimeout = sessionTimeout;
+
         memberId = struct.getString(MEMBER_ID_KEY_NAME);
         protocolType = struct.getString(PROTOCOL_TYPE_KEY_NAME);
 
@@ -107,13 +145,16 @@ public class JoinGroupRequest extends AbstractRequest {
     public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
+            case 1:
                 return new JoinGroupResponse(
+                        versionId,
                         Errors.forException(e).code(),
                         JoinGroupResponse.UNKNOWN_GENERATION_ID,
                         JoinGroupResponse.UNKNOWN_PROTOCOL,
                         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
                         JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
                         Collections.<String, ByteBuffer>emptyMap());
+
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
@@ -128,6 +169,10 @@ public class JoinGroupRequest extends AbstractRequest {
         return sessionTimeout;
     }
 
+    public int rebalanceTimeout() {
+        return rebalanceTimeout;
+    }
+
     public String memberId() {
         return memberId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index dd829ed..8895ace 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -24,7 +24,8 @@ import java.util.List;
 import java.util.Map;
 
 public class JoinGroupResponse extends AbstractRequestResponse {
-    
+
+    private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id);
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
@@ -65,7 +66,17 @@ public class JoinGroupResponse extends AbstractRequestResponse {
                              String memberId,
                              String leaderId,
                              Map<String, ByteBuffer> groupMembers) {
-        super(new Struct(CURRENT_SCHEMA));
+        this(CURRENT_VERSION, errorCode, generationId, groupProtocol, memberId, leaderId, groupMembers);
+    }
+
+    public JoinGroupResponse(int version,
+                             short errorCode,
+                             int generationId,
+                             String groupProtocol,
+                             String memberId,
+                             String leaderId,
+                             Map<String, ByteBuffer> groupMembers) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, version)));
 
         struct.set(ERROR_CODE_KEY_NAME, errorCode);
         struct.set(GENERATION_ID_KEY_NAME, generationId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index a76f48e..6cf93a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -50,8 +50,6 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
      *  UNKNOWN_TOPIC_OR_PARTITION (3)  <- only for request v0
      *  GROUP_LOAD_IN_PROGRESS (14)
      *  NOT_COORDINATOR_FOR_GROUP (16)
-     *  ILLEGAL_GENERATION (22)
-     *  UNKNOWN_MEMBER_ID (25)
      *  TOPIC_AUTHORIZATION_FAILED (29)
      *  GROUP_AUTHORIZATION_FAILED (30)
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8b52664..8d2ac00 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -90,8 +90,7 @@ public class KafkaConsumerTest {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
         try {
-            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
-                    props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+            new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
         } catch (KafkaException e) {
             assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
             assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
@@ -314,17 +313,17 @@ public class KafkaConsumerTest {
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
-        return new KafkaConsumer<byte[], byte[]>(
-            props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
     }
 
     @Test
-    public void verifyHeartbeatSent() {
+    public void verifyHeartbeatSent() throws Exception {
         String topic = "topic";
         TopicPartition partition = new TopicPartition(topic, 0);
 
+        int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
+        int heartbeatIntervalMs = 1000;
         int autoCommitIntervalMs = 10000;
 
         Time time = new MockTime();
@@ -337,7 +336,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
 
         consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
             @Override
@@ -370,9 +369,6 @@ public class KafkaConsumerTest {
         consumer.poll(0);
         assertEquals(Collections.singleton(partition), consumer.assignment());
 
-        // heartbeat interval is 2 seconds
-        time.sleep(heartbeatIntervalMs);
-
         final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
         client.prepareResponseFrom(new MockClient.RequestMatcher() {
             @Override
@@ -382,18 +378,23 @@ public class KafkaConsumerTest {
             }
         }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
 
+        // heartbeat interval is 2 seconds
+        time.sleep(heartbeatIntervalMs);
+        Thread.sleep(heartbeatIntervalMs);
+
         consumer.poll(0);
 
         assertTrue(heartbeatReceived.get());
     }
 
     @Test
-    public void verifyHeartbeatSentWhenFetchedDataReady() {
+    public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
         String topic = "topic";
         TopicPartition partition = new TopicPartition(topic, 0);
 
+        int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 30000;
-        int heartbeatIntervalMs = 3000;
+        int heartbeatIntervalMs = 1000;
         int autoCommitIntervalMs = 10000;
 
         Time time = new MockTime();
@@ -406,7 +407,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
         consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
             @Override
             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
@@ -438,8 +439,6 @@ public class KafkaConsumerTest {
         client.respondFrom(fetchResponse(partition, 0, 5), node);
         client.poll(0, time.milliseconds());
 
-        time.sleep(heartbeatIntervalMs);
-
         client.prepareResponseFrom(fetchResponse(partition, 5, 0), node);
         final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
         client.prepareResponseFrom(new MockClient.RequestMatcher() {
@@ -450,6 +449,9 @@ public class KafkaConsumerTest {
             }
         }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
 
+        time.sleep(heartbeatIntervalMs);
+        Thread.sleep(heartbeatIntervalMs);
+
         consumer.poll(0);
 
         assertTrue(heartbeatReceived.get());
@@ -459,6 +461,7 @@ public class KafkaConsumerTest {
     public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
         String topic = "topic";
         final TopicPartition partition = new TopicPartition(topic, 0);
+        int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 3000;
         int heartbeatIntervalMs = 2000;
         int autoCommitIntervalMs = 1000;
@@ -473,7 +476,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
         consumer.assign(Arrays.asList(partition));
         consumer.seekToBeginning(Arrays.asList(partition));
 
@@ -496,6 +499,7 @@ public class KafkaConsumerTest {
         long offset1 = 10000;
         long offset2 = 20000;
 
+        int rebalanceTimeoutMs = 6000;
         int sessionTimeoutMs = 3000;
         int heartbeatIntervalMs = 2000;
         int autoCommitIntervalMs = 1000;
@@ -510,7 +514,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
         consumer.assign(Arrays.asList(partition1));
 
         // lookup coordinator
@@ -541,6 +545,7 @@ public class KafkaConsumerTest {
         String topic = "topic";
         final TopicPartition partition = new TopicPartition(topic, 0);
 
+        int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 30000;
         int heartbeatIntervalMs = 3000;
 
@@ -558,7 +563,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
         consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
             @Override
             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
@@ -619,6 +624,7 @@ public class KafkaConsumerTest {
         String topic = "topic";
         final TopicPartition partition = new TopicPartition(topic, 0);
 
+        int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 30000;
         int heartbeatIntervalMs = 3000;
 
@@ -636,7 +642,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
-                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
         consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
             @Override
             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
@@ -725,6 +731,7 @@ public class KafkaConsumerTest {
                                                       KafkaClient client,
                                                       Metadata metadata,
                                                       PartitionAssignor assignor,
+                                                      int rebalanceTimeoutMs,
                                                       int sessionTimeoutMs,
                                                       int heartbeatIntervalMs,
                                                       int autoCommitIntervalMs) {
@@ -757,6 +764,7 @@ public class KafkaConsumerTest {
         ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
                 consumerClient,
                 groupId,
+                rebalanceTimeoutMs,
                 sessionTimeoutMs,
                 heartbeatIntervalMs,
                 assignors,
@@ -800,6 +808,9 @@ public class KafkaConsumerTest {
                 metrics,
                 subscriptions,
                 metadata,
+                autoCommitEnabled,
+                autoCommitIntervalMs,
+                heartbeatIntervalMs,
                 retryBackoffMs,
                 requestTimeoutMs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 7a05eb1..77f9df5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -16,15 +16,20 @@
  **/
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
@@ -37,12 +42,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class AbstractCoordinatorTest {
 
     private static final ByteBuffer EMPTY_DATA = ByteBuffer.wrap(new byte[0]);
-    private static final int SESSION_TIMEOUT_MS = 30000;
+    private static final int REBALANCE_TIMEOUT_MS = 60000;
+    private static final int SESSION_TIMEOUT_MS = 10000;
     private static final int HEARTBEAT_INTERVAL_MS = 3000;
     private static final long RETRY_BACKOFF_MS = 100;
     private static final long REQUEST_TIMEOUT_MS = 40000;
@@ -77,8 +85,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testCoordinatorDiscoveryBackoff() {
-        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
-        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 
         // blackout the coordinator for 50 milliseconds to simulate a disconnect.
         // after backing off, we should be able to connect.
@@ -91,17 +99,65 @@ public class AbstractCoordinatorTest {
         assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
     }
 
-    private Struct groupCoordinatorResponse(Node node, short error) {
-        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
+    @Test
+    public void testUncaughtExceptionInHeartbeatThread() throws Exception {
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+
+
+        final RuntimeException e = new RuntimeException();
+
+        // raise the error when the background thread tries to send a heartbeat
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                if (request.request().header().apiKey() == ApiKeys.HEARTBEAT.id)
+                    throw e;
+                return false;
+            }
+        }, heartbeatResponse(Errors.UNKNOWN));
+
+        try {
+            coordinator.ensureActiveGroup();
+            mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+            synchronized (coordinator) {
+                coordinator.notify();
+            }
+            Thread.sleep(100);
+
+            coordinator.pollHeartbeat(mockTime.milliseconds());
+            fail("Expected pollHeartbeat to raise an error");
+        } catch (RuntimeException exception) {
+            assertEquals(exception, e);
+        }
+    }
+
+    private Struct groupCoordinatorResponse(Node node, Errors error) {
+        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error.code(), node);
         return response.toStruct();
     }
 
+    private Struct heartbeatResponse(Errors error) {
+        HeartbeatResponse response = new HeartbeatResponse(error.code());
+        return response.toStruct();
+    }
+
+    private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
+        return new JoinGroupResponse(error.code(), generationId, "dummy-subprotocol", memberId, leaderId,
+                Collections.<String, ByteBuffer>emptyMap()).toStruct();
+    }
+
+    private Struct syncGroupResponse(Errors error) {
+        return new SyncGroupResponse(error.code(), ByteBuffer.allocate(0)).toStruct();
+    }
+
     public class DummyCoordinator extends AbstractCoordinator {
 
         public DummyCoordinator(ConsumerNetworkClient client,
                                 Metrics metrics,
                                 Time time) {
-            super(client, GROUP_ID, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics,
+            super(client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics,
                     METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS);
         }
 


Mime
View raw message