kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3627: consumer fails to execute delayed tasks in poll when records are available
Date Fri, 06 May 2016 05:24:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 a037d1766 -> 69bc4cac4


KAFKA-3627: consumer fails to execute delayed tasks in poll when records are available

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Liquan Pei <liquanpei@gmail.com>, Jiangjie Qin <becket.qin@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1295 from hachikuji/KAFKA-3627

(cherry picked from commit 2ff955044aa875176aaa58a9be4a79c494a3fb27)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69bc4cac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69bc4cac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69bc4cac

Branch: refs/heads/0.10.0
Commit: 69bc4cac46e4448a155766ad4c4bfcab687e9ee3
Parents: a037d17
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu May 5 22:24:03 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu May 5 22:24:19 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  69 ++-
 .../consumer/internals/ConsumerCoordinator.java |   7 +-
 .../internals/ConsumerNetworkClient.java        |  55 ++-
 .../clients/consumer/internals/Fetcher.java     |  13 +-
 .../consumer/internals/SubscriptionState.java   |  35 +-
 .../org/apache/kafka/clients/MockClient.java    |  55 ++-
 .../clients/consumer/KafkaConsumerTest.java     | 442 ++++++++++++++++++-
 .../clients/consumer/internals/FetcherTest.java |  34 +-
 8 files changed, 621 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69bc4cac/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2373a13..2784644 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -504,7 +504,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.consumer";
 
-    private String clientId;
+    private final String clientId;
     private final ConsumerCoordinator coordinator;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
@@ -517,7 +517,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final SubscriptionState subscriptions;
     private final Metadata metadata;
     private final long retryBackoffMs;
-    private long requestTimeoutMs;
+    private final long requestTimeoutMs;
     private boolean closed = false;
 
     // currentThread holds the threadId of the current thread accessing KafkaConsumer
@@ -602,10 +602,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
             this.time = new SystemTime();
 
-            clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+            String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
-            Map<String, String> metricsTags = new LinkedHashMap<String, String>();
+            this.clientId = clientId;
+            Map<String, String> metricsTags = new LinkedHashMap<>();
             metricsTags.put("client-id", clientId);
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
@@ -702,6 +703,35 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
+    // visible for testing
+    KafkaConsumer(String clientId,
+                  ConsumerCoordinator coordinator,
+                  Deserializer<K> keyDeserializer,
+                  Deserializer<V> valueDeserializer,
+                  Fetcher<K, V> fetcher,
+                  ConsumerInterceptors<K, V> interceptors,
+                  Time time,
+                  ConsumerNetworkClient client,
+                  Metrics metrics,
+                  SubscriptionState subscriptions,
+                  Metadata metadata,
+                  long retryBackoffMs,
+                  long requestTimeoutMs) {
+        this.clientId = clientId;
+        this.coordinator = coordinator;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.fetcher = fetcher;
+        this.interceptors = interceptors;
+        this.time = time;
+        this.client = client;
+        this.metrics = metrics;
+        this.subscriptions = subscriptions;
+        this.metadata = metadata;
+        this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+    }
+
     /**
      * Get the set of partitions currently assigned to this consumer. If subscription happened by directly assigning
      * partitions using {@link #assign(Collection)} then this will simply return the same partitions that
@@ -910,14 +940,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     // and avoid block waiting for their responses to enable pipelining while the user
                     // is handling the fetched records.
                     //
-                    // NOTE that we use quickPoll() in this case which disables wakeups and delayed
-                    // task execution since the consumed positions has already been updated and we
-                    // must return these records to users to process before being interrupted or
-                    // auto-committing offsets
-                    fetcher.sendFetches(metadata.fetch());
-                    client.quickPoll(false);
-                    return this.interceptors == null
-                        ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
+                    // NOTE: since the consumed position has already been updated, we must not allow
+                    // wakeups or any other errors to be triggered prior to returning the fetched records.
+                    // Additionally, pollNoWakeup does not allow automatic commits to get triggered.
+                    fetcher.sendFetches();
+                    client.pollNoWakeup();
+
+                    if (this.interceptors == null)
+                        return new ConsumerRecords<>(records);
+                    else
+                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
                 }
 
                 long elapsed = time.milliseconds() - start;
@@ -949,18 +981,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         if (!subscriptions.hasAllFetchPositions())
             updateFetchPositions(this.subscriptions.missingFetchPositions());
 
+        long now = time.milliseconds();
+
+        // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
+        client.executeDelayedTasks(now);
+
         // init any new fetches (won't resend pending fetches)
-        Cluster cluster = this.metadata.fetch();
         Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
 
         // if data is available already, e.g. from a previous network client poll() call to commit,
         // then just return it immediately
-        if (!records.isEmpty()) {
+        if (!records.isEmpty())
             return records;
-        }
 
-        fetcher.sendFetches(cluster);
-        client.poll(timeout);
+        fetcher.sendFetches();
+        client.poll(timeout, now);
         return fetcher.fetchedRecords();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69bc4cac/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index c1f373f..a642512 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -379,9 +379,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             }
         });
 
-        // ensure commit has a chance to be transmitted (without blocking on its completion)
-        // note that we allow delayed tasks to be executed in case heartbeats need to be sent
-        client.quickPoll(true);
+        // ensure the commit has a chance to be transmitted (without blocking on its completion).
+        // Note that commits are treated as heartbeats by the coordinator, so there is no need to
+        // explicitly allow heartbeats through delayed task execution.
+        client.pollNoWakeup();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/69bc4cac/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index d4c2656..b65a5b7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Higher level consumer access to the network layer with basic support for futures and
- * task scheduling. NOT thread-safe!
+ * task scheduling. This class is not thread-safe, except for wakeup().
  */
 public class ConsumerNetworkClient implements Closeable {
     private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
@@ -53,8 +53,10 @@ public class ConsumerNetworkClient implements Closeable {
     private final Time time;
     private final long retryBackoffMs;
     private final long unsentExpiryMs;
-    // wakeup enabled flag need to be volatile since it is allowed to be accessed concurrently
-    volatile private boolean wakeupsEnabled = true;
+
+    // this count is only accessed from the consumer's main thread
+    private int wakeupDisabledCount = 0;
+
 
     public ConsumerNetworkClient(KafkaClient client,
                                  Metadata metadata,
@@ -182,8 +184,7 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     /**
-     * Poll for any network IO. All send requests will either be transmitted on the network
-     * or failed when this call completes.
+     * Poll for any network IO.
      * @param timeout The maximum time to wait for an IO event.
      * @throws WakeupException if {@link #wakeup()} is called from another thread
      */
@@ -192,14 +193,25 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     /**
+     * Poll for any network IO.
+     * @param timeout timeout in milliseconds
+     * @param now current time in milliseconds
+     */
+    public void poll(long timeout, long now) {
+        poll(timeout, now, true);
+    }
+
+    /**
      * Poll for network IO and return immediately. This will not trigger wakeups,
      * nor will it execute any delayed tasks.
-     * @param executeDelayedTasks Whether to allow delayed task execution (true allows)
      */
-    public void quickPoll(boolean executeDelayedTasks) {
+    public void pollNoWakeup() {
         disableWakeups();
-        poll(0, time.milliseconds(), executeDelayedTasks);
-        enableWakeups();
+        try {
+            poll(0, time.milliseconds(), false);
+        } finally {
+            enableWakeups();
+        }
     }
 
     private void poll(long timeout, long now, boolean executeDelayedTasks) {
@@ -230,6 +242,16 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     /**
+     * Execute delayed tasks now.
+     * @param now current time in milliseconds
+     * @throws WakeupException if a wakeup has been requested
+     */
+    public void executeDelayedTasks(long now) {
+        delayedTasks.poll(now);
+        maybeTriggerWakeup();
+    }
+
+    /**
      * Block until all pending requests from the given node have finished.
      * @param node The node to await requests from
      */
@@ -336,22 +358,29 @@ public class ConsumerNetworkClient implements Closeable {
 
     private void clientPoll(long timeout, long now) {
         client.poll(timeout, now);
-        if (wakeupsEnabled && wakeup.get()) {
+        maybeTriggerWakeup();
+    }
+
+    private void maybeTriggerWakeup() {
+        if (wakeupDisabledCount == 0 && wakeup.get()) {
             wakeup.set(false);
             throw new WakeupException();
         }
     }
 
     public void disableWakeups() {
-        this.wakeupsEnabled = false;
+        wakeupDisabledCount++;
     }
 
     public void enableWakeups() {
-        this.wakeupsEnabled = true;
+        if (wakeupDisabledCount <= 0)
+            throw new IllegalStateException("Cannot enable wakeups since they were never disabled");
+
+        wakeupDisabledCount--;
 
         // re-wakeup the client if the flag was set since previous wake-up call
         // could be cleared by poll(0) while wakeups were disabled
-        if (wakeup.get())
+        if (wakeupDisabledCount == 0 && wakeup.get())
             this.client.wakeup();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69bc4cac/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index f6d3387..0256fe7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -131,10 +131,9 @@ public class Fetcher<K, V> {
     /**
      * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
      *
-     * @param cluster The current cluster metadata
      */
-    public void sendFetches(Cluster cluster) {
-        for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests(cluster).entrySet()) {
+    public void sendFetches() {
+        for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
             final FetchRequest fetch = fetchEntry.getValue();
             client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
                     .addListener(new RequestFutureListener<ClientResponse>() {
@@ -525,8 +524,9 @@ public class Fetcher<K, V> {
      * Create fetch requests for all nodes for which we have assigned partitions
      * that have no existing requests in flight.
      */
-    private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
+    private Map<Node, FetchRequest> createFetchRequests() {
         // create the fetch info
+        Cluster cluster = metadata.fetch();
         Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
         for (TopicPartition partition : fetchablePartitions()) {
             Node node = cluster.leaderFor(partition);
@@ -586,11 +586,14 @@ public class Fetcher<K, V> {
                 ByteBuffer buffer = partition.recordSet;
                 MemoryRecords records = MemoryRecords.readableRecords(buffer);
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
+                boolean skippedRecords = false;
                 for (LogEntry logEntry : records) {
                     // Skip the messages earlier than current position.
                     if (logEntry.offset() >= position) {
                         parsed.add(parseRecord(tp, logEntry));
                         bytes += logEntry.size();
+                    } else {
+                        skippedRecords = true;
                     }
                 }
 
@@ -599,7 +602,7 @@ public class Fetcher<K, V> {
                     ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
                     this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
                     this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
-                } else if (buffer.limit() > 0) {
+                } else if (buffer.limit() > 0 && !skippedRecords) {
                     // we did not read a single message from a non-empty buffer
                     // because that message's size is larger than fetch size, in this case
                     // record this exception

http://git-wip-us.apache.org/repos/asf/kafka/blob/69bc4cac/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index e72a476..ec35115 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -285,7 +285,7 @@ public class SubscriptionState {
         Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
         for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
             TopicPartitionState state = entry.getValue();
-            if (state.hasValidPosition)
+            if (state.hasValidPosition())
                 allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.position));
         }
         return allConsumed;
@@ -304,7 +304,7 @@ public class SubscriptionState {
     }
 
     public boolean isOffsetResetNeeded(TopicPartition partition) {
-        return assignedState(partition).awaitingReset;
+        return assignedState(partition).awaitingReset();
     }
 
     public OffsetResetStrategy resetStrategy(TopicPartition partition) {
@@ -313,7 +313,7 @@ public class SubscriptionState {
 
     public boolean hasAllFetchPositions() {
         for (TopicPartitionState state : assignment.values())
-            if (!state.hasValidPosition)
+            if (!state.hasValidPosition())
                 return false;
         return true;
     }
@@ -321,7 +321,7 @@ public class SubscriptionState {
     public Set<TopicPartition> missingFetchPositions() {
         Set<TopicPartition> missing = new HashSet<>();
         for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet())
-            if (!entry.getValue().hasValidPosition)
+            if (!entry.getValue().hasValidPosition())
                 missing.add(entry.getKey());
         return missing;
     }
@@ -359,40 +359,39 @@ public class SubscriptionState {
     }
 
     private static class TopicPartitionState {
-        private Long position;
+        private Long position; // last consumed position
         private OffsetAndMetadata committed;  // last committed position
-
-        private boolean hasValidPosition; // whether we have valid consumed and fetched positions
         private boolean paused;  // whether this partition has been paused by the user
-        private boolean awaitingReset; // whether we are awaiting reset
-        private OffsetResetStrategy resetStrategy;  // the reset strategy if awaitingReset is set
+        private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
 
         public TopicPartitionState() {
             this.paused = false;
             this.position = null;
             this.committed = null;
-            this.awaitingReset = false;
-            this.hasValidPosition = false;
             this.resetStrategy = null;
         }
 
         private void awaitReset(OffsetResetStrategy strategy) {
-            this.awaitingReset = true;
             this.resetStrategy = strategy;
             this.position = null;
-            this.hasValidPosition = false;
+        }
+
+        public boolean awaitingReset() {
+            return resetStrategy != null;
+        }
+
+        public boolean hasValidPosition() {
+            return position != null;
         }
 
         private void seek(long offset) {
             this.position = offset;
-            this.awaitingReset = false;
             this.resetStrategy = null;
-            this.hasValidPosition = true;
         }
 
         private void position(long offset) {
-            if (!hasValidPosition)
-                throw new IllegalStateException("Cannot update fetch position without valid consumed/fetched positions");
+            if (!hasValidPosition())
+                throw new IllegalStateException("Cannot set a new position without a valid current position");
             this.position = offset;
         }
 
@@ -409,7 +408,7 @@ public class SubscriptionState {
         }
 
         private boolean isFetchable() {
-            return !paused && hasValidPosition;
+            return !paused && hasValidPosition();
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69bc4cac/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 527d283..9fbbb88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -47,11 +47,13 @@ public class MockClient implements KafkaClient {
         public final Struct responseBody;
         public final boolean disconnected;
         public final RequestMatcher requestMatcher;
+        public Node node;
 
-        public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher requestMatcher) {
+        public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher requestMatcher, Node node) {
             this.responseBody = responseBody;
             this.disconnected = disconnected;
             this.requestMatcher = requestMatcher;
+            this.node = node;
         }
 
     }
@@ -124,17 +126,23 @@ public class MockClient implements KafkaClient {
 
     @Override
     public void send(ClientRequest request, long now) {
-        if (!futureResponses.isEmpty()) {
-            FutureResponse futureResp = futureResponses.poll();
+        Iterator<FutureResponse> iterator = futureResponses.iterator();
+        while (iterator.hasNext()) {
+            FutureResponse futureResp = iterator.next();
+            if (futureResp.node != null && !request.request().destination().equals(futureResp.node.idString()))
+                continue;
+
             if (!futureResp.requestMatcher.matches(request))
                 throw new IllegalStateException("Next in line response did not match expected request");
 
             ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
             responses.add(resp);
-        } else {
-            request.setSendTimeMs(now);
-            this.requests.add(request);
+            iterator.remove();
+            return;
         }
+
+        request.setSendTimeMs(now);
+        this.requests.add(request);
     }
 
     @Override
@@ -163,10 +171,31 @@ public class MockClient implements KafkaClient {
         responses.add(new ClientResponse(request, time.milliseconds(), disconnected, body));
     }
 
+    public void respondFrom(Struct body, Node node) {
+        respondFrom(body, node, false);
+    }
+
+    public void respondFrom(Struct body, Node node, boolean disconnected) {
+        Iterator<ClientRequest> iterator = requests.iterator();
+        while (iterator.hasNext()) {
+            ClientRequest request = iterator.next();
+            if (request.request().destination().equals(node.idString())) {
+                iterator.remove();
+                responses.add(new ClientResponse(request, time.milliseconds(), disconnected, body));
+                return;
+            }
+        }
+        throw new IllegalArgumentException("No requests available to node " + node);
+    }
+
     public void prepareResponse(Struct body) {
         prepareResponse(ALWAYS_TRUE, body, false);
     }
 
+    public void prepareResponseFrom(Struct body, Node node) {
+        prepareResponseFrom(ALWAYS_TRUE, body, node, false);
+    }
+
     /**
      * Prepare a response for a request matching the provided matcher. If the matcher does not
      * match, {@link #send(ClientRequest, long)} will throw IllegalStateException
@@ -177,10 +206,18 @@ public class MockClient implements KafkaClient {
         prepareResponse(matcher, body, false);
     }
 
+    public void prepareResponseFrom(RequestMatcher matcher, Struct body, Node node) {
+        prepareResponseFrom(matcher, body, node, false);
+    }
+
     public void prepareResponse(Struct body, boolean disconnected) {
         prepareResponse(ALWAYS_TRUE, body, disconnected);
     }
 
+    public void prepareResponseFrom(Struct body, Node node, boolean disconnected) {
+        prepareResponseFrom(ALWAYS_TRUE, body, node, disconnected);
+    }
+
     /**
      * Prepare a response for a request matching the provided matcher. If the matcher does not
      * match, {@link #send(ClientRequest, long)} will throw IllegalStateException
@@ -189,7 +226,11 @@ public class MockClient implements KafkaClient {
      * @param disconnected Whether the request was disconnected
      */
     public void prepareResponse(RequestMatcher matcher, Struct body, boolean disconnected) {
-        futureResponses.add(new FutureResponse(body, disconnected, matcher));
+        prepareResponseFrom(matcher, body, null, disconnected);
+    }
+
+    public void prepareResponseFrom(RequestMatcher matcher, Struct body, Node node, boolean disconnected) {
+        futureResponses.add(new FutureResponse(body, disconnected, matcher, node));
     }
 
     public void setNode(Node node) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/69bc4cac/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2272795..694faf2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -16,21 +16,57 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
+import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.MockConsumerInterceptor;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class KafkaConsumerTest {
 
@@ -64,19 +100,19 @@ public class KafkaConsumerTest {
 
         consumer.subscribe(Collections.singletonList(topic));
         assertEquals(singleton(topic), consumer.subscription());
-        Assert.assertTrue(consumer.assignment().isEmpty());
+        assertTrue(consumer.assignment().isEmpty());
 
         consumer.subscribe(Collections.<String>emptyList());
-        Assert.assertTrue(consumer.subscription().isEmpty());
-        Assert.assertTrue(consumer.assignment().isEmpty());
+        assertTrue(consumer.subscription().isEmpty());
+        assertTrue(consumer.assignment().isEmpty());
 
         consumer.assign(Collections.singletonList(tp0));
-        Assert.assertTrue(consumer.subscription().isEmpty());
+        assertTrue(consumer.subscription().isEmpty());
         assertEquals(singleton(tp0), consumer.assignment());
 
         consumer.unsubscribe();
-        Assert.assertTrue(consumer.subscription().isEmpty());
-        Assert.assertTrue(consumer.assignment().isEmpty());
+        assertTrue(consumer.subscription().isEmpty());
+        assertTrue(consumer.assignment().isEmpty());
 
         consumer.close();
     }
@@ -124,16 +160,16 @@ public class KafkaConsumerTest {
 
         consumer.assign(Collections.singletonList(tp0));
         assertEquals(singleton(tp0), consumer.assignment());
-        Assert.assertTrue(consumer.paused().isEmpty());
+        assertTrue(consumer.paused().isEmpty());
 
         consumer.pause(singleton(tp0));
         assertEquals(singleton(tp0), consumer.paused());
 
         consumer.resume(singleton(tp0));
-        Assert.assertTrue(consumer.paused().isEmpty());
+        assertTrue(consumer.paused().isEmpty());
 
         consumer.unsubscribe();
-        Assert.assertTrue(consumer.paused().isEmpty());
+        assertTrue(consumer.paused().isEmpty());
 
         consumer.close();
     }
@@ -147,4 +183,392 @@ public class KafkaConsumerTest {
         return new KafkaConsumer<byte[], byte[]>(
             props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
     }
+
+    @Test
+    public void verifyHeartbeatSent() {
+        String topic = "topic";
+        TopicPartition partition = new TopicPartition(topic, 0);
+
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+        int autoCommitIntervalMs = 10000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+
+        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+
+            }
+
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                // set initial position so we don't need a lookup
+                for (TopicPartition partition : partitions)
+                    consumer.seek(partition, 0);
+            }
+        });
+
+        // lookup coordinator
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+        // join group
+        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
+
+        // sync group
+        client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator);
+
+        // initial fetch
+        client.prepareResponseFrom(fetchResponse(partition, 0, 0), node);
+
+        consumer.poll(0);
+        assertEquals(Collections.singleton(partition), consumer.assignment());
+
+        // heartbeat interval is 2 seconds
+        time.sleep(heartbeatIntervalMs);
+
+        final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
+        client.prepareResponseFrom(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                heartbeatReceived.set(true);
+                return true;
+            }
+        }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
+
+        consumer.poll(0);
+
+        assertTrue(heartbeatReceived.get());
+    }
+
+    @Test
+    public void verifyHeartbeatSentWhenFetchedDataReady() {
+        String topic = "topic";
+        TopicPartition partition = new TopicPartition(topic, 0);
+
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+        int autoCommitIntervalMs = 10000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+
+            }
+
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                // set initial position so we don't need a lookup
+                for (TopicPartition partition : partitions)
+                    consumer.seek(partition, 0);
+            }
+        });
+
+        // lookup coordinator
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+        // join group
+        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
+
+        // sync group
+        client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator);
+
+        consumer.poll(0);
+
+        // respond to the outstanding fetch so that we have data available on the next poll
+        client.respondFrom(fetchResponse(partition, 0, 5), node);
+        client.poll(0, time.milliseconds());
+
+        time.sleep(heartbeatIntervalMs);
+
+        client.prepareResponseFrom(fetchResponse(partition, 5, 0), node);
+        final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
+        client.prepareResponseFrom(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                heartbeatReceived.set(true);
+                return true;
+            }
+        }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
+
+        consumer.poll(0);
+
+        assertTrue(heartbeatReceived.get());
+    }
+
+    @Test
+    public void testAutoCommitSentBeforePositionUpdate() {
+        String topic = "topic";
+        final TopicPartition partition = new TopicPartition(topic, 0);
+
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+
+        // adjust auto commit interval lower than heartbeat so we don't need to deal with
+        // a concurrent heartbeat request
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+
+            }
+
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                // set initial position so we don't need a lookup
+                for (TopicPartition partition : partitions)
+                    consumer.seek(partition, 0);
+            }
+        });
+
+        // lookup coordinator
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+        // join group
+        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
+
+        // sync group
+        client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator);
+
+        consumer.poll(0);
+
+        // respond to the outstanding fetch so that we have data available on the next poll
+        client.respondFrom(fetchResponse(partition, 0, 5), node);
+        client.poll(0, time.milliseconds());
+
+        time.sleep(autoCommitIntervalMs);
+
+        client.prepareResponseFrom(fetchResponse(partition, 5, 0), node);
+
+        // no data has been returned to the user yet, so the committed offset should be 0
+        final AtomicBoolean commitReceived = new AtomicBoolean(false);
+        client.prepareResponseFrom(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
+                OffsetCommitRequest.PartitionData partitionData = commitRequest.offsetData().get(partition);
+                if (partitionData.offset == 0) {
+                    commitReceived.set(true);
+                    return true;
+                }
+                return false;
+            }
+        }, new OffsetCommitResponse(Collections.singletonMap(partition, Errors.NONE.code())).toStruct(), coordinator);
+
+        consumer.poll(0);
+
+        assertTrue(commitReceived.get());
+    }
+
+    @Test
+    public void testWakeupWithFetchDataAvailable() {
+        String topic = "topic";
+        final TopicPartition partition = new TopicPartition(topic, 0);
+
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+
+        // adjust auto commit interval lower than heartbeat so we don't need to deal with
+        // a concurrent heartbeat request
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+
+            }
+
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                // set initial position so we don't need a lookup
+                for (TopicPartition partition : partitions)
+                    consumer.seek(partition, 0);
+            }
+        });
+
+        // lookup coordinator
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+        // join group
+        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
+
+        // sync group
+        client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator);
+
+        consumer.poll(0);
+
+        // respond to the outstanding fetch so that we have data available on the next poll
+        client.respondFrom(fetchResponse(partition, 0, 5), node);
+        client.poll(0, time.milliseconds());
+
+        consumer.wakeup();
+
+        try {
+            consumer.poll(0);
+            fail();
+        } catch (WakeupException e) {
+        }
+
+        // make sure the position hasn't been updated
+        assertEquals(0, consumer.position(partition));
+
+        // the next poll should return the completed fetch
+        ConsumerRecords<String, String> records = consumer.poll(0);
+        assertEquals(5, records.count());
+    }
+
+    private Struct joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) {
+        return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId,
+                Collections.<String, ByteBuffer>emptyMap()).toStruct();
+    }
+
+    private Struct syncGroupResponse(List<TopicPartition> partitions, short error) {
+        ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
+        return new SyncGroupResponse(error, buf).toStruct();
+    }
+
+    private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) {
+        MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+        for (int i = 0; i < count; i++)
+            records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
+        records.close();
+        FetchResponse response = new FetchResponse(Collections.singletonMap(
+                tp, new FetchResponse.PartitionData(Errors.NONE.code(), 5, records.buffer())), 0);
+        return response.toStruct();
+    }
+
+    private KafkaConsumer<String, String> newConsumer(Time time,
+                                                      KafkaClient client,
+                                                      Metadata metadata,
+                                                      PartitionAssignor assignor,
+                                                      int sessionTimeoutMs,
+                                                      int heartbeatIntervalMs,
+                                                      int autoCommitIntervalMs) {
+        // create a consumer with mocked time and mocked network
+
+        String clientId = "mock-consumer";
+        String groupId = "mock-group";
+        String metricGroupPrefix = "consumer";
+        long retryBackoffMs = 100;
+        long requestTimeoutMs = 30000;
+        boolean autoCommitEnabled = true;
+        boolean excludeInternalTopics = true;
+        int minBytes = 1;
+        int maxWaitMs = 500;
+        int fetchSize = 1024 * 1024;
+        int maxPollRecords = Integer.MAX_VALUE;
+        boolean checkCrcs = true;
+
+        Deserializer<String> keyDeserializer = new StringDeserializer();
+        Deserializer<String> valueDeserializer = new StringDeserializer();
+
+        OffsetResetStrategy autoResetStrategy = OffsetResetStrategy.EARLIEST;
+        OffsetCommitCallback defaultCommitCallback = new ConsumerCoordinator.DefaultOffsetCommitCallback();
+        List<PartitionAssignor> assignors = Arrays.asList(assignor);
+        ConsumerInterceptors<String, String> interceptors = null;
+
+        Metrics metrics = new Metrics();
+        SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy);
+        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, retryBackoffMs, requestTimeoutMs);
+        ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
+                consumerClient,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                assignors,
+                metadata,
+                subscriptions,
+                metrics,
+                metricGroupPrefix,
+                time,
+                retryBackoffMs,
+                defaultCommitCallback,
+                autoCommitEnabled,
+                autoCommitIntervalMs,
+                interceptors,
+                excludeInternalTopics);
+
+        Fetcher<String, String> fetcher = new Fetcher<>(
+                consumerClient,
+                minBytes,
+                maxWaitMs,
+                fetchSize,
+                maxPollRecords,
+                checkCrcs,
+                keyDeserializer,
+                valueDeserializer,
+                metadata,
+                subscriptions,
+                metrics,
+                metricGroupPrefix,
+                time,
+                retryBackoffMs);
+
+        return new KafkaConsumer<>(
+                clientId,
+                consumerCoordinator,
+                keyDeserializer,
+                valueDeserializer,
+                fetcher,
+                interceptors,
+                time,
+                consumerClient,
+                metrics,
+                subscriptions,
+                metadata,
+                retryBackoffMs,
+                requestTimeoutMs);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69bc4cac/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 49bff10..8fad30f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -123,7 +123,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         // normal fetch
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
@@ -158,7 +158,7 @@ public class FetcherTest {
         client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
         client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords.buffer(), Errors.NONE.code(), 100L, 0));
 
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(2, records.size());
@@ -166,14 +166,14 @@ public class FetcherTest {
         assertEquals(1, records.get(0).offset());
         assertEquals(2, records.get(1).offset());
 
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(1, records.size());
         assertEquals(4L, (long) subscriptions.position(tp));
         assertEquals(3, records.get(0).offset());
 
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(2, records.size());
@@ -198,7 +198,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         // normal fetch
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         client.prepareResponse(fetchResponse(records.buffer(), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         consumerRecords = fetcher.fetchedRecords().get(tp);
@@ -223,7 +223,7 @@ public class FetcherTest {
         records.close();
 
         // resize the limit of the buffer to pretend it is only fetch-size large
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         client.prepareResponse(fetchResponse((ByteBuffer) records.buffer().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         fetcher.fetchedRecords();
@@ -235,7 +235,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         // resize the limit of the buffer to pretend it is only fetch-size large
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
         consumerClient.poll(0);
         try {
@@ -252,7 +252,7 @@ public class FetcherTest {
         subscriptions.assignFromSubscribed(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
 
         // Now the rebalance happens and fetch positions are cleared
         subscriptions.assignFromSubscribed(Arrays.asList(tp));
@@ -268,7 +268,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         subscriptions.pause(tp);
 
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
@@ -282,7 +282,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         subscriptions.pause(tp);
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         assertTrue(client.requests().isEmpty());
     }
 
@@ -291,7 +291,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -303,7 +303,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -315,7 +315,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertTrue(subscriptions.isOffsetResetNeeded(tp));
@@ -328,7 +328,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
         subscriptionsNoAutoReset.seek(tp, 0);
 
-        fetcherNoAutoReset.sendFetches(cluster);
+        fetcherNoAutoReset.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
@@ -341,7 +341,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
         subscriptionsNoAutoReset.seek(tp, 0);
 
-        fetcherNoAutoReset.sendFetches(cluster);
+        fetcherNoAutoReset.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
@@ -360,7 +360,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.sendFetches(cluster);
+        fetcher.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -519,7 +519,7 @@ public class FetcherTest {
                 }
                 this.records.close();
             }
-            fetcher.sendFetches(cluster);
+            fetcher.sendFetches();
             client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i));
             consumerClient.poll(0);
             records = fetcher.fetchedRecords().get(tp);


Mime
View raw message