kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/3] kafka git commit: KAFKA-2123: add callback in commit api and use a delayed queue for async requests; reviewed by Ewen Cheslack-Postava and Guozhang Wang
Date Wed, 15 Jul 2015 19:40:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a7e0ac365 -> 99c0686be


http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6837453..4d9a425 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -138,7 +138,6 @@ public class SubscriptionState {
 
     public void committed(TopicPartition tp, long offset) {
         this.committed.put(tp, offset);
-        this.needsFetchCommittedOffsets = false;
     }
 
     public Long committed(TopicPartition tp) {
@@ -152,6 +151,10 @@ public class SubscriptionState {
     public boolean refreshCommitsNeeded() {
         return this.needsFetchCommittedOffsets;
     }
+
+    public void commitsRefreshed() {
+        this.needsFetchCommittedOffsets = false;
+    }
     
     public void seek(TopicPartition tp, long offset) {
         fetched(tp, offset);

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
new file mode 100644
index 0000000..ba9ce82
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has
+ * not yet been created.
+ */
+public class ConsumerCoordinatorNotAvailableException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ConsumerCoordinatorNotAvailableException() {
+        super();
+    }
+
+    public ConsumerCoordinatorNotAvailableException(String message) {
+        super(message);
+    }
+
+    public ConsumerCoordinatorNotAvailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ConsumerCoordinatorNotAvailableException(Throwable cause) {
+        super(cause);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
new file mode 100644
index 0000000..18d61a2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+
+/**
+ * Server disconnected before a request could be completed.
+ */
+public class DisconnectException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public DisconnectException() {
+        super();
+    }
+
+    public DisconnectException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DisconnectException(String message) {
+        super(message);
+    }
+
+    public DisconnectException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java b/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
new file mode 100644
index 0000000..d20b74a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class IllegalGenerationException extends RetriableException {
+    private static final long serialVersionUID = 1L;
+
+    public IllegalGenerationException() {
+        super();
+    }
+
+    public IllegalGenerationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public IllegalGenerationException(String message) {
+        super(message);
+    }
+
+    public IllegalGenerationException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
new file mode 100644
index 0000000..b6c83b4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is
+ * not a coordinator for.
+ */
+public class NotCoordinatorForConsumerException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NotCoordinatorForConsumerException() {
+        super();
+    }
+
+    public NotCoordinatorForConsumerException(String message) {
+        super(message);
+    }
+
+    public NotCoordinatorForConsumerException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public NotCoordinatorForConsumerException(Throwable cause) {
+        super(cause);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
new file mode 100644
index 0000000..016506e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change
+ * for that offsets topic partition).
+ */
+public class OffsetLoadInProgressException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public OffsetLoadInProgressException() {
+        super();
+    }
+
+    public OffsetLoadInProgressException(String message) {
+        super(message);
+    }
+
+    public OffsetLoadInProgressException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public OffsetLoadInProgressException(Throwable cause) {
+        super(cause);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
new file mode 100644
index 0000000..9bcbd11
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class UnknownConsumerIdException extends RetriableException {
+    private static final long serialVersionUID = 1L;
+
+    public UnknownConsumerIdException() {
+        super();
+    }
+
+    public UnknownConsumerIdException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public UnknownConsumerIdException(String message) {
+        super(message);
+    }
+
+    public UnknownConsumerIdException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 4c0ecc3..d6c41c1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -53,11 +53,11 @@ public enum Errors {
     NETWORK_EXCEPTION(13,
             new NetworkException("The server disconnected before a response was received.")),
     OFFSET_LOAD_IN_PROGRESS(14,
-            new ApiException("The coordinator is loading offsets and can't process requests.")),
+            new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")),
     CONSUMER_COORDINATOR_NOT_AVAILABLE(15,
-            new ApiException("The coordinator is not available.")),
+            new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")),
     NOT_COORDINATOR_FOR_CONSUMER(16,
-            new ApiException("This is not the correct co-ordinator for this consumer.")),
+            new NotCoordinatorForConsumerException("This is not the correct coordinator for this consumer.")),
     INVALID_TOPIC_EXCEPTION(17,
             new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
     RECORD_LIST_TOO_LARGE(18,
@@ -69,13 +69,13 @@ public enum Errors {
     INVALID_REQUIRED_ACKS(21,
             new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
     ILLEGAL_GENERATION(22,
-            new ApiException("Specified consumer generation id is not valid.")),
+            new IllegalGenerationException("Specified consumer generation id is not valid.")),
     INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY(23,
             new ApiException("The request partition assignment strategy does not match that of the group.")),
     UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY(24,
             new ApiException("The request partition assignment strategy is unknown to the broker.")),
     UNKNOWN_CONSUMER_ID(25,
-            new ApiException("The coordinator is not aware of this consumer.")),
+            new UnknownConsumerIdException("The coordinator is not aware of this consumer.")),
     INVALID_SESSION_TIMEOUT(26,
             new ApiException("The session timeout is not within an acceptable range.")),
     COMMITTING_PARTITIONS_NOT_ASSIGNED(27,

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
new file mode 100644
index 0000000..9de1cee
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConsumerNetworkClientTest {
+
+    private String topicName = "test";
+    private MockTime time = new MockTime();
+    private MockClient client = new MockClient(time);
+    private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
+    private Node node = cluster.nodes().get(0);
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+
+    @Test
+    public void send() {
+        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        RequestFuture<ClientResponse> future = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertEquals(1, consumerClient.pendingRequestCount(node));
+        assertFalse(future.isDone());
+
+        consumerClient.poll(future);
+        assertTrue(future.isDone());
+        assertTrue(future.succeeded());
+
+        ClientResponse clientResponse = future.value();
+        HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody());
+        assertEquals(Errors.NONE.code(), response.errorCode());
+    }
+
+    @Test
+    public void multiSend() {
+        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        RequestFuture<ClientResponse> future1 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        RequestFuture<ClientResponse> future2 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        assertEquals(2, consumerClient.pendingRequestCount());
+        assertEquals(2, consumerClient.pendingRequestCount(node));
+
+        consumerClient.awaitPendingRequests(node);
+        assertTrue(future1.succeeded());
+        assertTrue(future2.succeeded());
+    }
+
+    @Test
+    public void schedule() {
+        TestDelayedTask task = new TestDelayedTask();
+        consumerClient.schedule(task, time.milliseconds());
+        consumerClient.poll(0);
+        assertEquals(1, task.executions);
+
+        consumerClient.schedule(task, time.milliseconds() + 100);
+        consumerClient.poll(0);
+        assertEquals(1, task.executions);
+
+        time.sleep(100);
+        consumerClient.poll(0);
+        assertEquals(2, task.executions);
+    }
+
+    @Test
+    public void wakeup() {
+        RequestFuture<ClientResponse> future = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        consumerClient.wakeup();
+        try {
+            consumerClient.poll(0);
+            fail();
+        } catch (ConsumerWakeupException e) {
+        }
+
+        client.respond(heartbeatResponse(Errors.NONE.code()));
+        consumerClient.poll(future);
+        assertTrue(future.isDone());
+    }
+
+
+    private HeartbeatRequest heartbeatRequest() {
+        return new HeartbeatRequest("group", 1, "consumerId");
+    }
+
+    private Struct heartbeatResponse(short error) {
+        HeartbeatResponse response = new HeartbeatResponse(error);
+        return response.toStruct();
+    }
+
+    private static class TestDelayedTask implements DelayedTask {
+        int executions = 0;
+        @Override
+        public void run(long now) {
+            executions++;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index d085fe5..ca832be 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -18,13 +18,19 @@ package org.apache.kafka.clients.consumer.internals;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.CommitType;
+import org.apache.kafka.clients.consumer.ConsumerCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -36,10 +42,12 @@ import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -51,108 +59,173 @@ public class CoordinatorTest {
     private String groupId = "test-group";
     private TopicPartition tp = new TopicPartition(topicName, 0);
     private int sessionTimeoutMs = 10;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 5000;
     private String rebalanceStrategy = "not-matter";
-    private MockTime time = new MockTime();
-    private MockClient client = new MockClient(time);
+    private MockTime time;
+    private MockClient client;
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
-    private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
-    private Metrics metrics = new Metrics(time);
+    private SubscriptionState subscriptions;
+    private Metadata metadata;
+    private Metrics metrics;
     private Map<String, String> metricTags = new LinkedHashMap<String, String>();
-
-    private Coordinator coordinator = new Coordinator(client,
-        groupId,
-        sessionTimeoutMs,
-        rebalanceStrategy,
-        subscriptions,
-        metrics,
-        "consumer" + groupId,
-        metricTags,
-        time);
+    private ConsumerNetworkClient consumerClient;
+    private MockRebalanceCallback rebalanceCallback;
+    private Coordinator coordinator;
 
     @Before
     public void setup() {
+        this.time = new MockTime();
+        this.client = new MockClient(time);
+        this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
+        this.metadata = new Metadata(0, Long.MAX_VALUE);
+        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+        this.metrics = new Metrics(time);
+        this.rebalanceCallback = new MockRebalanceCallback();
+
         client.setNode(node);
+
+        this.coordinator = new Coordinator(consumerClient,
+                groupId,
+                sessionTimeoutMs,
+                rebalanceStrategy,
+                subscriptions,
+                metrics,
+                "consumer" + groupId,
+                metricTags,
+                time,
+                requestTimeoutMs,
+                retryBackoffMs,
+                rebalanceCallback);
     }
 
     @Test
     public void testNormalHeartbeat() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
         // normal heartbeat
         time.sleep(sessionTimeoutMs);
-        coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
-        assertEquals(1, client.inFlightRequestCount());
-        client.respond(heartbeatResponse(Errors.NONE.code()));
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.succeeded());
     }
 
     @Test
     public void testCoordinatorNotAvailable() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
         // consumer_coordinator_not_available will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
-        coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
-        assertEquals(1, client.inFlightRequestCount());
-        client.respond(heartbeatResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()));
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()));
         time.sleep(sessionTimeoutMs);
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.failed());
+        assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
         assertTrue(coordinator.coordinatorUnknown());
     }
 
     @Test
     public void testNotCoordinator() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
         // not_coordinator will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
-        coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
-        assertEquals(1, client.inFlightRequestCount());
-        client.respond(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_CONSUMER.code()));
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_CONSUMER.code()));
         time.sleep(sessionTimeoutMs);
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.failed());
+        assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), future.exception());
         assertTrue(coordinator.coordinatorUnknown());
     }
 
     @Test
     public void testIllegalGeneration() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
         // illegal_generation will cause re-partition
         subscriptions.subscribe(topicName);
         subscriptions.changePartitionAssignment(Collections.singletonList(tp));
 
         time.sleep(sessionTimeoutMs);
-        coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
-        assertEquals(1, client.inFlightRequestCount());
-        client.respond(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
         time.sleep(sessionTimeoutMs);
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.failed());
+        assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception());
+        assertTrue(subscriptions.partitionAssignmentNeeded());
+    }
+
+    @Test
+    public void testUnknownConsumerId() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // illegal_generation will cause re-partition
+        subscriptions.subscribe(topicName);
+        subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+
+        time.sleep(sessionTimeoutMs);
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_CONSUMER_ID.code()));
+        time.sleep(sessionTimeoutMs);
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.failed());
+        assertEquals(Errors.UNKNOWN_CONSUMER_ID.exception(), future.exception());
         assertTrue(subscriptions.partitionAssignmentNeeded());
     }
 
     @Test
     public void testCoordinatorDisconnect() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
         // coordinator disconnect will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
-        coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
-        assertEquals(1, client.inFlightRequestCount());
-        client.respond(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
         time.sleep(sessionTimeoutMs);
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.failed());
+        assertTrue(future.exception() instanceof DisconnectException);
         assertTrue(coordinator.coordinatorUnknown());
     }
 
@@ -162,16 +235,18 @@ public class CoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
         // normal join group
         client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
-        coordinator.assignPartitions(time.milliseconds());
-        client.poll(0, time.milliseconds());
+        coordinator.ensurePartitionAssignment();
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertEquals(1, rebalanceCallback.revokedCount);
+        assertEquals(Collections.emptySet(), rebalanceCallback.revoked);
+        assertEquals(1, rebalanceCallback.assignedCount);
+        assertEquals(Collections.singleton(tp), rebalanceCallback.assigned);
     }
 
     @Test
@@ -180,165 +255,228 @@ public class CoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
-        assertTrue(subscriptions.partitionAssignmentNeeded());
+        coordinator.ensureCoordinatorKnown();
 
-        // diconnected from original coordinator will cause re-discover and join again
+        // disconnected from original coordinator will cause re-discover and join again
         client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()), true);
-        coordinator.assignPartitions(time.milliseconds());
-        client.poll(0, time.milliseconds());
-        assertTrue(subscriptions.partitionAssignmentNeeded());
-
-        // rediscover the coordinator
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
-
-        // try assigning partitions again
         client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
-        coordinator.assignPartitions(time.milliseconds());
-        client.poll(0, time.milliseconds());
+        coordinator.ensurePartitionAssignment();
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertEquals(1, rebalanceCallback.revokedCount);
+        assertEquals(Collections.emptySet(), rebalanceCallback.revoked);
+        assertEquals(1, rebalanceCallback.assignedCount);
+        assertEquals(Collections.singleton(tp), rebalanceCallback.assigned);
     }
 
+    @Test(expected = ApiException.class)
+    public void testUnknownPartitionAssignmentStrategy() {
+        subscriptions.subscribe(topicName);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // coordinator doesn't like our assignment strategy
+        client.prepareResponse(joinGroupResponse(0, "consumer", Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()));
+        coordinator.ensurePartitionAssignment();
+    }
+
+    @Test(expected = ApiException.class)
+    public void testInvalidSessionTimeout() {
+        subscriptions.subscribe(topicName);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // coordinator doesn't like our assignment strategy
+        client.prepareResponse(joinGroupResponse(0, "consumer", Collections.<TopicPartition>emptyList(), Errors.INVALID_SESSION_TIMEOUT.code()));
+        coordinator.ensurePartitionAssignment();
+    }
 
     @Test
     public void testCommitOffsetNormal() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
-        // With success flag
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        RequestFuture<Void> result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
-        assertTrue(result.isDone());
-        assertTrue(result.succeeded());
 
-        // Without success flag
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
-        client.respond(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        AtomicBoolean success = new AtomicBoolean(false);
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, callback(success));
+        consumerClient.poll(0);
+        assertTrue(success.get());
     }
 
     @Test
-    public void testCommitOffsetError() {
+    public void testCommitOffsetAsyncCoordinatorNotAvailable() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
         // async commit with coordinator not available
+        MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, cb);
+        consumerClient.poll(0);
+
         assertTrue(coordinator.coordinatorUnknown());
-        // resume
+        assertEquals(1, cb.invoked);
+        assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception);
+    }
+
+    @Test
+    public void testCommitOffsetAsyncNotCoordinator() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
         // async commit with not coordinator
+        MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, cb);
+        consumerClient.poll(0);
+
         assertTrue(coordinator.coordinatorUnknown());
-        // resume
+        assertEquals(1, cb.invoked);
+        assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), cb.exception);
+    }
+
+    @Test
+    public void testCommitOffsetAsyncDisconnected() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
-        // sync commit with not_coordinator
+        // async commit with coordinator disconnected
+        MockCommitCallback cb = new MockCommitCallback();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, cb);
+        consumerClient.poll(0);
+
+        assertTrue(coordinator.coordinatorUnknown());
+        assertEquals(1, cb.invoked);
+        assertTrue(cb.exception instanceof DisconnectException);
+    }
+
+    @Test
+    public void testCommitOffsetSyncNotCoordinator() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+        MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        RequestFuture<Void> result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
-        assertTrue(result.isDone());
-        assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
+        assertEquals(1, cb.invoked);
+        assertNull(cb.exception);
+    }
 
-        // sync commit with coordinator disconnected
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
+    @Test
+    public void testCommitOffsetSyncCoordinatorNotAvailable() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
 
-        assertEquals(0, client.poll(0, time.milliseconds()).size());
-        assertTrue(result.isDone());
-        assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
+        // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+        MockCommitCallback cb = new MockCommitCallback();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
+        assertEquals(1, cb.invoked);
+        assertNull(cb.exception);
+    }
 
+    @Test
+    public void testCommitOffsetSyncCoordinatorDisconnected() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
-        result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
-        assertEquals(1, client.poll(0, time.milliseconds()).size());
-        assertTrue(result.isDone());
-        assertTrue(result.succeeded());
+        // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+        MockCommitCallback cb = new MockCommitCallback();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
+        assertEquals(1, cb.invoked);
+        assertNull(cb.exception);
     }
 
+    @Test(expected = ApiException.class)
+    public void testCommitOffsetSyncThrowsNonRetriableException() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // sync commit with invalid partitions should throw if we have no callback
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code())), false);
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, null);
+    }
 
     @Test
-    public void testFetchOffset() {
+    public void testCommitOffsetSyncCallbackHandlesNonRetriableException() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // sync commit with invalid partitions should throw if we have no callback
+        MockCommitCallback cb = new MockCommitCallback();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code())), false);
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
+        assertTrue(cb.exception instanceof ApiException);
+    }
 
+    @Test
+    public void testRefreshOffset() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
+        coordinator.ensureCoordinatorKnown();
 
-        // normal fetch
+        subscriptions.subscribe(tp);
+        subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
-        RequestFuture<Map<TopicPartition, Long>> result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
-        client.poll(0, time.milliseconds());
-        assertTrue(result.isDone());
-        assertEquals(100L, (long) result.value().get(tp));
+        coordinator.refreshCommittedOffsetsIfNeeded();
+        assertFalse(subscriptions.refreshCommitsNeeded());
+        assertEquals(100L, (long) subscriptions.committed(tp));
+    }
 
-        // fetch with loading in progress
+    @Test
+    public void testRefreshOffsetLoadInProgress() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        subscriptions.subscribe(tp);
+        subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        coordinator.refreshCommittedOffsetsIfNeeded();
+        assertFalse(subscriptions.refreshCommitsNeeded());
+        assertEquals(100L, (long) subscriptions.committed(tp));
+    }
 
-        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
-        client.poll(0, time.milliseconds());
-        assertTrue(result.isDone());
-        assertTrue(result.failed());
-        assertEquals(RequestFuture.RetryAction.BACKOFF, result.retryAction());
-
-        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
-        client.poll(0, time.milliseconds());
-        assertTrue(result.isDone());
-        assertEquals(100L, (long) result.value().get(tp));
+    @Test
+    public void testRefreshOffsetNotCoordinatorForConsumer() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
 
-        // fetch with not coordinator
+        subscriptions.subscribe(tp);
+        subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        coordinator.refreshCommittedOffsetsIfNeeded();
+        assertFalse(subscriptions.refreshCommitsNeeded());
+        assertEquals(100L, (long) subscriptions.committed(tp));
+    }
 
-        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
-        client.poll(0, time.milliseconds());
-        assertTrue(result.isDone());
-        assertTrue(result.failed());
-        assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
-
-        coordinator.discoverConsumerCoordinator();
-        client.poll(0, time.milliseconds());
-
-        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
-        client.poll(0, time.milliseconds());
-        assertTrue(result.isDone());
-        assertEquals(100L, (long) result.value().get(tp));
-
-        // fetch with no fetchable offsets
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
-        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
-        client.poll(0, time.milliseconds());
-        assertTrue(result.isDone());
-        assertTrue(result.value().isEmpty());
+    @Test
+    public void testRefreshOffsetWithNoFetchableOffsets() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
 
-        // fetch with offset -1
+        subscriptions.subscribe(tp);
+        subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
-        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
-        client.poll(0, time.milliseconds());
-        assertTrue(result.isDone());
-        assertTrue(result.value().isEmpty());
+        coordinator.refreshCommittedOffsetsIfNeeded();
+        assertFalse(subscriptions.refreshCommitsNeeded());
+        assertEquals(null, subscriptions.committed(tp));
     }
 
     private Struct consumerMetadataResponse(Node node, short error) {
@@ -366,4 +504,45 @@ public class CoordinatorTest {
         OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
         return response.toStruct();
     }
+
+    private ConsumerCommitCallback callback(final AtomicBoolean success) {
+        return new ConsumerCommitCallback() {
+            @Override
+            public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
+                if (exception == null)
+                    success.set(true);
+            }
+        };
+    }
+
+    private static class MockCommitCallback implements ConsumerCommitCallback {
+        public int invoked = 0;
+        public Exception exception = null;
+
+        @Override
+        public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
+            invoked++;
+            this.exception = exception;
+        }
+    }
+
+    private static class MockRebalanceCallback implements Coordinator.RebalanceCallback {
+        public Collection<TopicPartition> revoked;
+        public Collection<TopicPartition> assigned;
+        public int revokedCount = 0;
+        public int assignedCount = 0;
+
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+            this.assigned = partitions;
+            assignedCount++;
+        }
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            this.revoked = partitions;
+            revokedCount++;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
new file mode 100644
index 0000000..db87b66
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class DelayedTaskQueueTest {
+    private DelayedTaskQueue scheduler = new DelayedTaskQueue();
+    private ArrayList<DelayedTask> executed = new ArrayList<DelayedTask>();
+
+    @Test
+    public void testScheduling() {
+        // Empty scheduler
+        assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0));
+        scheduler.poll(0);
+        assertEquals(Collections.emptyList(), executed);
+
+        TestTask task1 = new TestTask();
+        TestTask task2 = new TestTask();
+        TestTask task3 = new TestTask();
+        scheduler.add(task1, 20);
+        assertEquals(20, scheduler.nextTimeout(0));
+        scheduler.add(task2, 10);
+        assertEquals(10, scheduler.nextTimeout(0));
+        scheduler.add(task3, 30);
+        assertEquals(10, scheduler.nextTimeout(0));
+
+        scheduler.poll(5);
+        assertEquals(Collections.emptyList(), executed);
+        assertEquals(5, scheduler.nextTimeout(5));
+
+        scheduler.poll(10);
+        assertEquals(Arrays.asList(task2), executed);
+        assertEquals(10, scheduler.nextTimeout(10));
+
+        scheduler.poll(20);
+        assertEquals(Arrays.asList(task2, task1), executed);
+        assertEquals(20, scheduler.nextTimeout(10));
+
+        scheduler.poll(30);
+        assertEquals(Arrays.asList(task2, task1, task3), executed);
+        assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(30));
+    }
+
+    @Test
+    public void testRemove() {
+        TestTask task1 = new TestTask();
+        TestTask task2 = new TestTask();
+        TestTask task3 = new TestTask();
+        scheduler.add(task1, 20);
+        scheduler.add(task2, 10);
+        scheduler.add(task3, 30);
+        scheduler.add(task1, 40);
+        assertEquals(10, scheduler.nextTimeout(0));
+
+        scheduler.remove(task2);
+        assertEquals(20, scheduler.nextTimeout(0));
+
+        scheduler.remove(task1);
+        assertEquals(30, scheduler.nextTimeout(0));
+
+        scheduler.remove(task3);
+        assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0));
+    }
+
+    private class TestTask implements DelayedTask {
+        @Override
+        public void run(long now) {
+            executed.add(this);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 405efdc..7a4e586 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -52,6 +52,7 @@ public class FetcherTest {
     private int minBytes = 1;
     private int maxWaitMs = 0;
     private int fetchSize = 1000;
+    private long retryBackoffMs = 100;
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
@@ -60,10 +61,11 @@ public class FetcherTest {
     private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private Metrics metrics = new Metrics(time);
     private Map<String, String> metricTags = new LinkedHashMap<String, String>();
+    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
 
     private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
 
-    private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(client,
+    private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(consumerClient,
         minBytes,
         maxWaitMs,
         fetchSize,
@@ -75,7 +77,8 @@ public class FetcherTest {
         metrics,
         "consumer" + groupId,
         metricTags,
-        time);
+        time,
+        retryBackoffMs);
 
     @Before
     public void setup() throws Exception {
@@ -97,9 +100,9 @@ public class FetcherTest {
         subscriptions.consumed(tp, 0);
 
         // normal fetch
-        fetcher.initFetches(cluster, time.milliseconds());
-        client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
-        client.poll(0, time.milliseconds());
+        fetcher.initFetches(cluster);
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+        consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(3, records.size());
         assertEquals(4L, (long) subscriptions.fetched(tp)); // this is the next fetching position
@@ -119,24 +122,24 @@ public class FetcherTest {
         subscriptions.consumed(tp, 0);
 
         // fetch with not leader
-        fetcher.initFetches(cluster, time.milliseconds());
-        client.respond(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
-        client.poll(0, time.milliseconds());
+        fetcher.initFetches(cluster);
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
+        consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
 
         // fetch with unknown topic partition
-        fetcher.initFetches(cluster, time.milliseconds());
-        client.respond(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
-        client.poll(0, time.milliseconds());
+        fetcher.initFetches(cluster);
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
+        consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
 
         // fetch with out of range
         subscriptions.fetched(tp, 5);
-        fetcher.initFetches(cluster, time.milliseconds());
-        client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
-        client.poll(0, time.milliseconds());
+        fetcher.initFetches(cluster);
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
+        consumerClient.poll(0);
         assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(null, subscriptions.fetched(tp));
@@ -151,9 +154,9 @@ public class FetcherTest {
         subscriptions.consumed(tp, 5);
 
         // fetch with out of range
-        fetcher.initFetches(cluster, time.milliseconds());
-        client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
-        client.poll(0, time.milliseconds());
+        fetcher.initFetches(cluster);
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
+        consumerClient.poll(0);
         assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(null, subscriptions.fetched(tp));

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index ee1ede0..b587e14 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -51,4 +51,19 @@ public class HeartbeatTest {
         assertEquals(0, heartbeat.timeToNextHeartbeat(100));
         assertEquals(0, heartbeat.timeToNextHeartbeat(200));
     }
+
+    @Test
+    public void testSessionTimeoutExpired() {
+        heartbeat.sentHeartbeat(time.milliseconds());
+        time.sleep(305);
+        assertTrue(heartbeat.sessionTimeoutExpired(time.milliseconds()));
+    }
+
+    @Test
+    public void testResetSession() {
+        heartbeat.sentHeartbeat(time.milliseconds());
+        time.sleep(305);
+        heartbeat.resetSessionTimeout(time.milliseconds());
+        assertFalse(heartbeat.sessionTimeoutExpired(time.milliseconds()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
new file mode 100644
index 0000000..7372754
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RequestFutureTest {
+
+    @Test
+    public void testComposeSuccessCase() {
+        RequestFuture<String> future = new RequestFuture<String>();
+        RequestFuture<Integer> composed = future.compose(new RequestFutureAdapter<String, Integer>() {
+            @Override
+            public void onSuccess(String value, RequestFuture<Integer> future) {
+                future.complete(value.length());
+            }
+        });
+
+        future.complete("hello");
+
+        assertTrue(composed.isDone());
+        assertTrue(composed.succeeded());
+        assertEquals(5, (int) composed.value());
+    }
+
+    @Test
+    public void testComposeFailureCase() {
+        RequestFuture<String> future = new RequestFuture<String>();
+        RequestFuture<Integer> composed = future.compose(new RequestFutureAdapter<String, Integer>() {
+            @Override
+            public void onSuccess(String value, RequestFuture<Integer> future) {
+                future.complete(value.length());
+            }
+        });
+
+        RuntimeException e = new RuntimeException();
+        future.raise(e);
+
+        assertTrue(composed.isDone());
+        assertTrue(composed.failed());
+        assertEquals(e, composed.exception());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 92ffb91..3eb5f95 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -12,17 +12,13 @@
  */
 package kafka.api
 
+import java.{lang, util}
+
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.clients.consumer.Consumer
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.consumer.CommitType
+import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
 
 import kafka.utils.{TestUtils, Logging}
 import kafka.server.KafkaConfig
@@ -46,6 +42,8 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   val topic = "topic"
   val part = 0
   val tp = new TopicPartition(topic, part)
+  val part2 = 1
+  val tp2 = new TopicPartition(topic, part2)
 
   // configure the servers and clients
   this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
@@ -56,12 +54,13 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-  
+  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+
   override def setUp() {
     super.setUp()
 
     // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
+    TestUtils.createTopic(this.zkClient, topic, 2, serverCount, this.servers)
   }
 
   def testSimpleConsumption() {
@@ -74,6 +73,45 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     
     this.consumers(0).seek(tp, 0)
     consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
+
+    // check async commit callbacks
+    val commitCallback = new CountConsumerCommitCallback()
+    this.consumers(0).commit(CommitType.ASYNC, commitCallback)
+
+    // shouldn't make progress until poll is invoked
+    Thread.sleep(10)
+    assertEquals(0, commitCallback.count)
+    awaitCommitCallback(this.consumers(0), commitCallback)
+  }
+
+  def testCommitSpecifiedOffsets() {
+    sendRecords(5, tp)
+    sendRecords(7, tp2)
+
+    this.consumers(0).subscribe(tp)
+    this.consumers(0).subscribe(tp2)
+
+    // Need to poll to join the group
+    this.consumers(0).poll(50)
+    val pos1 = this.consumers(0).position(tp)
+    val pos2 = this.consumers(0).position(tp2)
+    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp, 3L)), CommitType.SYNC)
+    assertEquals(3, this.consumers(0).committed(tp))
+    intercept[NoOffsetForPartitionException] {
+      this.consumers(0).committed(tp2)
+    }
+    // positions should not change
+    assertEquals(pos1, this.consumers(0).position(tp))
+    assertEquals(pos2, this.consumers(0).position(tp2))
+    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 5L)), CommitType.SYNC)
+    assertEquals(3, this.consumers(0).committed(tp))
+    assertEquals(5, this.consumers(0).committed(tp2))
+
+    // Using async should pick up the committed changes after commit completes
+    val commitCallback = new CountConsumerCommitCallback()
+    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 7L)), CommitType.ASYNC, commitCallback)
+    awaitCommitCallback(this.consumers(0), commitCallback)
+    assertEquals(7, this.consumers(0).committed(tp2))
   }
 
   def testAutoOffsetReset() {
@@ -150,7 +188,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
 
   def testPartitionReassignmentCallback() {
     val callback = new TestConsumerReassignmentCallback()
-    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
     val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
     consumer0.subscribe(topic)
         
@@ -172,6 +210,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     // this should cause another callback execution
     while(callback.callsToAssigned < 2)
       consumer0.poll(50)
+
     assertEquals(2, callback.callsToAssigned)
     assertEquals(2, callback.callsToRevoked)
 
@@ -191,9 +230,13 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     } 
   }
 
-  private def sendRecords(numRecords: Int) {
+  private def sendRecords(numRecords: Int): Unit = {
+    sendRecords(numRecords, tp)
+  }
+
+  private def sendRecords(numRecords: Int, tp: TopicPartition) {
     val futures = (0 until numRecords).map { i =>
-      this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+      this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), i.toString.getBytes, i.toString.getBytes))
     }
     futures.map(_.get)
   }
@@ -218,4 +261,18 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     }
   }
 
+  private def awaitCommitCallback(consumer: Consumer[Array[Byte], Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = {
+    val startCount = commitCallback.count
+    val started = System.currentTimeMillis()
+    while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000)
+      this.consumers(0).poll(10000)
+    assertEquals(startCount + 1, commitCallback.count)
+  }
+
+  private class CountConsumerCommitCallback extends ConsumerCommitCallback {
+    var count = 0
+
+    override def onComplete(offsets: util.Map[TopicPartition, lang.Long], exception: Exception): Unit = count += 1
+  }
+
 }
\ No newline at end of file


Mime
View raw message