kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/3] kafka git commit: KAFKA-1910; Refactor new consumer and fixed a bunch of corner cases / unit tests; reviewed by Onur Karaman and Jay Kreps
Date Tue, 10 Mar 2015 18:20:38 GMT
KAFKA-1910; Refactor new consumer and fixed a bunch of corner cases / unit tests; reviewed by Onur Karaman and Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0b92cec1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0b92cec1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0b92cec1

Branch: refs/heads/trunk
Commit: 0b92cec1e07a1f2d9aa70f3ecd7d0fb12290d2e2
Parents: 67940c4
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue Mar 10 11:19:48 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Mar 10 11:19:48 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/CommonClientConfigs.java      |   2 +-
 .../org/apache/kafka/clients/KafkaClient.java   |  11 +-
 .../org/apache/kafka/clients/NetworkClient.java |  14 +-
 .../kafka/clients/consumer/ConsumerConfig.java  |  28 +
 .../kafka/clients/consumer/KafkaConsumer.java   | 903 +++----------------
 .../clients/consumer/internals/Coordinator.java | 595 ++++++++++++
 .../clients/consumer/internals/Fetcher.java     | 459 ++++++++++
 .../clients/consumer/internals/Heartbeat.java   |  16 +-
 .../consumer/internals/SubscriptionState.java   |  21 +-
 .../kafka/clients/producer/KafkaProducer.java   |  26 +-
 .../kafka/clients/producer/ProducerConfig.java  |  26 +
 .../clients/producer/internals/Sender.java      |   5 +-
 .../apache/kafka/common/network/Selector.java   |   5 +-
 .../apache/kafka/common/protocol/Errors.java    |  29 +-
 .../kafka/common/record/MemoryRecords.java      |  12 +-
 .../kafka/common/requests/FetchResponse.java    |  11 +
 .../common/requests/HeartbeatResponse.java      |   6 +
 .../kafka/common/requests/JoinGroupRequest.java |   2 +
 .../common/requests/JoinGroupResponse.java      |   7 +
 .../common/requests/ListOffsetResponse.java     |   7 +
 .../kafka/common/requests/MetadataResponse.java |  14 +
 .../common/requests/OffsetCommitResponse.java   |   6 +
 .../common/requests/OffsetFetchResponse.java    |   9 +
 .../kafka/common/requests/ProduceResponse.java  |   7 +
 .../org/apache/kafka/clients/MockClient.java    |  59 +-
 .../consumer/internals/CoordinatorTest.java     | 284 ++++++
 .../clients/consumer/internals/FetcherTest.java | 177 ++++
 .../consumer/internals/HeartbeatTest.java       |  45 +
 .../internals/SubscriptionStateTest.java        |   3 +-
 .../internals/RecordAccumulatorTest.java        |   3 +
 .../clients/producer/internals/SenderTest.java  |  24 +-
 .../kafka/common/record/MemoryRecordsTest.java  |   2 +
 .../main/scala/kafka/common/ErrorMapping.scala  |   6 +-
 .../common/NoOffsetsCommittedException.scala    |  27 +
 .../kafka/common/OffsetMetadataAndError.scala   |   2 +-
 .../kafka/coordinator/ConsumerCoordinator.scala |  38 +-
 .../kafka/coordinator/ConsumerRegistry.scala    |   8 +-
 .../kafka/coordinator/DelayedHeartbeat.scala    |   2 +-
 .../scala/kafka/coordinator/GroupRegistry.scala |   7 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../integration/kafka/api/ConsumerTest.scala    | 301 +++++++
 .../kafka/api/IntegrationTestHarness.scala      |  12 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |   2 +-
 .../integration/KafkaServerTestHarness.scala    |   2 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  22 +
 46 files changed, 2346 insertions(+), 905 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 06fcfe6..cf32e4e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -54,5 +54,5 @@ public class CommonClientConfigs {
 
     public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
     public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
-    
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 8a3e55a..96ac6d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -54,10 +54,19 @@ public interface KafkaClient {
     public long connectionDelay(Node node, long now);
 
     /**
+     * Check if the connection of the node has failed, based on the connection state. Such connection failure are
+     * usually transient and can be resumed in the next {@link #ready(org.apache.kafka.common.Node, long)} }
+     * call, but there are cases where transient failures needs to be caught and re-acted upon.
+     *
+     * @param node the node to check
+     * @return true iff the connection has failed and the node is disconnected
+     */
+    public boolean connectionFailed(Node node);
+
+    /**
      * Queue up the given request for sending. Requests can only be sent on ready connections.
      * 
      * @param request The request
-     * @param now The current time
      */
     public void send(ClientRequest request);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index a7fa4a9..f429502 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -133,6 +133,19 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
+     * Check if the connection of the node has failed, based on the connection state. Such connection failure are
+     * usually transient and can be resumed in the next {@link #ready(org.apache.kafka.common.Node, long)} }
+     * call, but there are cases where transient failures needs to be caught and re-acted upon.
+     *
+     * @param node the node to check
+     * @return true iff the connection has failed and the node is disconnected
+     */
+    @Override
+    public boolean connectionFailed(Node node) {
+        return connectionStates.connectionState(node.id()).equals(ConnectionState.DISCONNECTED);
+    }
+
+    /**
      * Check if the node with the given id is ready to send more requests.
      * 
      * @param node The node
@@ -174,7 +187,6 @@ public class NetworkClient implements KafkaClient {
      * Queue up the given request for sending. Requests can only be sent out to ready nodes.
      * 
      * @param request The request
-     * @param now The current time
      */
     @Override
     public void send(ClientRequest request) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 5fb2100..42c7219 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -15,7 +15,9 @@ package org.apache.kafka.clients.consumer;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceCallback;
@@ -23,6 +25,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Deserializer;
 
 /**
  * The consumer configuration keys
@@ -153,6 +156,7 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
     private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
 
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
@@ -276,6 +280,30 @@ public class ConsumerConfig extends AbstractConfig {
                                         VALUE_DESERIALIZER_CLASS_DOC);
     }
 
+    public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
+                                                              Deserializer<?> keyDeserializer,
+                                                              Deserializer<?> valueDeserializer) {
+        Map<String, Object> newConfigs = new HashMap<String, Object>();
+        newConfigs.putAll(configs);
+        if (keyDeserializer != null)
+            newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
+        if (keyDeserializer != null)
+            newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
+        return newConfigs;
+    }
+
+    public static Properties addDeserializerToConfig(Properties properties,
+                                                     Deserializer<?> keyDeserializer,
+                                                     Deserializer<?> valueDeserializer) {
+        Properties newProperties = new Properties();
+        newProperties.putAll(properties);
+        if (keyDeserializer != null)
+            newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
+        if (keyDeserializer != null)
+            newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
+        return newProperties;
+    }
+
     ConsumerConfig(Map<? extends Object, ? extends Object> props) {
         super(CONFIG, props);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 67ceb75..2e24653 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -13,14 +13,11 @@
 package org.apache.kafka.clients.consumer;
 
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -28,56 +25,24 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.kafka.clients.ClientRequest;
-import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.ConnectionState;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.internals.Coordinator;
+import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.Heartbeat;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.network.Selector;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.LogEntry;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.requests.ConsumerMetadataRequest;
-import org.apache.kafka.common.requests.ConsumerMetadataResponse;
-import org.apache.kafka.common.requests.FetchRequest;
-import org.apache.kafka.common.requests.FetchRequest.PartitionData;
-import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.HeartbeatRequest;
-import org.apache.kafka.common.requests.HeartbeatResponse;
-import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.JoinGroupResponse;
-import org.apache.kafka.common.requests.ListOffsetRequest;
-import org.apache.kafka.common.requests.ListOffsetResponse;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.OffsetFetchRequest;
-import org.apache.kafka.common.requests.OffsetFetchResponse;
-import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -381,32 +346,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private static final long LATEST_OFFSET_TIMESTAMP = -1L;
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
 
+    private final Coordinator coordinator;
+    private final Fetcher<K, V> fetcher;
+
     private final Time time;
-    private final ConsumerMetrics metrics;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
+    private final NetworkClient client;
+    private final Metrics metrics;
     private final SubscriptionState subscriptions;
     private final Metadata metadata;
     private final Heartbeat heartbeat;
-    private final NetworkClient client;
-    private final int maxWaitMs;
-    private final int minBytes;
-    private final int fetchSize;
+    private final long retryBackoffMs;
     private final boolean autoCommit;
     private final long autoCommitIntervalMs;
-    private final String group;
-    private final long sessionTimeoutMs;
-    private final long retryBackoffMs;
-    private final String partitionAssignmentStrategy;
-    private final AutoOffsetResetStrategy offsetResetStrategy;
     private final ConsumerRebalanceCallback rebalanceCallback;
-    private final List<PartitionRecords<K, V>> records;
-    private final boolean checkCrcs;
     private long lastCommitAttemptMs;
-    private String consumerId;
-    private Node consumerCoordinator;
     private boolean closed = false;
-    private int generation;
 
     /**
      * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -440,22 +394,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                          ConsumerRebalanceCallback callback,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
-        this(new ConsumerConfig(addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
-             callback,
-             keyDeserializer,
-             valueDeserializer);
-    }
-
-    private static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
-                                                               Deserializer<?> keyDeserializer,
-                                                               Deserializer<?> valueDeserializer) {
-        Map<String, Object> newConfigs = new HashMap<String, Object>();
-        newConfigs.putAll(configs);
-        if (keyDeserializer != null)
-            newConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
-        if (keyDeserializer != null)
-            newConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
-        return newConfigs;
+        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
+            callback,
+            keyDeserializer,
+            valueDeserializer);
     }
 
     /**
@@ -486,57 +428,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                          ConsumerRebalanceCallback callback,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
-        this(new ConsumerConfig(addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
+        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
              callback,
              keyDeserializer,
              valueDeserializer);
     }
 
-    private static Properties addDeserializerToConfig(Properties properties,
-                                                      Deserializer<?> keyDeserializer,
-                                                      Deserializer<?> valueDeserializer) {
-        Properties newProperties = new Properties();
-        newProperties.putAll(properties);
-        if (keyDeserializer != null)
-            newProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
-        if (keyDeserializer != null)
-            newProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
-        return newProperties;
-    }
-
     @SuppressWarnings("unchecked")
     private KafkaConsumer(ConsumerConfig config,
                           ConsumerRebalanceCallback callback,
                           Deserializer<K> keyDeserializer,
                           Deserializer<V> valueDeserializer) {
         log.debug("Starting the Kafka consumer");
-        if (keyDeserializer == null)
-            this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                                                                Deserializer.class);
-        else
-            this.keyDeserializer = keyDeserializer;
-        if (valueDeserializer == null)
-            this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                                                                  Deserializer.class);
-        else
-            this.valueDeserializer = valueDeserializer;
         if (callback == null)
             this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
                                                                   ConsumerRebalanceCallback.class);
         else
             this.rebalanceCallback = callback;
         this.time = new SystemTime();
-        this.maxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
-        this.minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG);
-        this.fetchSize = config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
-        this.group = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
-        this.records = new LinkedList<PartitionRecords<K, V>>();
-        this.sessionTimeoutMs = config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
-        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
-        this.partitionAssignmentStrategy = config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
-        this.offsetResetStrategy = AutoOffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
-                                                                         .toUpperCase());
-        this.checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
+        this.heartbeat = new Heartbeat(config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), time.milliseconds());
         this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
         this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
         this.lastCommitAttemptMs = time.milliseconds();
@@ -551,33 +461,52 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                                                                         MetricsReporter.class);
         reporters.add(new JmxReporter(jmxPrefix));
-        Metrics metrics = new Metrics(metricConfig, reporters, time);
+        this.metrics = new Metrics(metricConfig, reporters, time);
         this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
         List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
         this.metadata.update(Cluster.bootstrap(addresses), 0);
 
-        String metricsGroup = "consumer";
+        String metricGrpPrefix = "consumer";
         Map<String, String> metricsTags = new LinkedHashMap<String, String>();
         metricsTags.put("client-id", clientId);
-        long reconnectBackoffMs = config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG);
-        int sendBuffer = config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG);
-        int receiveBuffer = config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG);
-        this.client = new NetworkClient(new Selector(metrics, time, metricsGroup, metricsTags),
+        this.client = new NetworkClient(new Selector(metrics, time, metricGrpPrefix, metricsTags),
                                         this.metadata,
                                         clientId,
-                                        100,
-                                        reconnectBackoffMs,
-                                        sendBuffer,
-                                        receiveBuffer);
+                                        100, // a fixed large enough value will suffice
+                                        config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                                        config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
+                                        config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
         this.subscriptions = new SubscriptionState();
-        this.metrics = new ConsumerMetrics(metrics, metricsGroup, metricsTags);
+        this.coordinator = new Coordinator(this.client,
+                                           config.getString(ConsumerConfig.GROUP_ID_CONFIG),
+                                           this.retryBackoffMs,
+                                           config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
+                                           config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+                                           this.metadata,
+                                           this.subscriptions,
+                                           metrics,
+                                           metricGrpPrefix,
+                                           metricsTags,
+                                           this.time);
+        this.fetcher = new Fetcher<K, V>(this.client,
+                                              this.retryBackoffMs,
+                                              config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
+                                              config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
+                                              config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
+                                              config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
+                                              config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(),
+                                              keyDeserializer == null ? config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class) : keyDeserializer,
+                                              valueDeserializer == null ? config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class) : valueDeserializer,
+                                              this.metadata,
+                                              this.subscriptions,
+                                              metrics,
+                                              metricGrpPrefix,
+                                              metricsTags,
+                                              this.time);
 
         config.logUnused();
 
-        this.consumerCoordinator = null;
-        this.consumerId = "";
-        this.generation = -1;
         log.debug("Kafka consumer created");
     }
 
@@ -683,21 +612,19 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         long now = time.milliseconds();
 
         if (subscriptions.partitionsAutoAssigned()) {
-            // get partition assignment if needed
-            if (subscriptions.needsPartitionAssignment()) {
-                joinGroup(now);
-            } else if (!heartbeat.isAlive(now)) {
-                log.error("Failed heartbeat check.");
-                coordinatorDead();
-            } else if (heartbeat.shouldHeartbeat(now)) {
-                initiateHeartbeat(now);
+            if (subscriptions.partitionAssignmentNeeded()) {
+                // rebalance to get partition assignment
+                reassignPartitions(now);
+            } else {
+                // try to heartbeat with the coordinator if needed
+                coordinator.maybeHeartbeat(now);
             }
         }
 
         // fetch positions if we have partitions we're subscribed to that we
         // don't know the offset for
         if (!subscriptions.hasAllFetchPositions())
-            fetchMissingPositionsOrResetThem(this.subscriptions.missingFetchPositions(), now);
+            updateFetchPositions(this.subscriptions.missingFetchPositions(), now);
 
         // maybe autocommit position
         if (shouldAutoCommit(now))
@@ -707,16 +634,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
          * initiate any needed fetches, then block for the timeout the user specified
          */
         Cluster cluster = this.metadata.fetch();
-        reinstateFetches(cluster, now);
+        fetcher.initFetches(cluster, now);
         client.poll(timeout, now);
 
         /*
          * initiate a fetch request for any nodes that we just got a response from without blocking
          */
-        reinstateFetches(cluster, now);
+        fetcher.initFetches(cluster, now);
         client.poll(0, now);
 
-        return new ConsumerRecords<K, V>(consumeBufferedRecords());
+        return new ConsumerRecords<K, V>(fetcher.fetchedRecords());
     }
 
     /**
@@ -737,68 +664,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public synchronized void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
         ensureNotClosed();
         log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
+
         long now = time.milliseconds();
         this.lastCommitAttemptMs = now;
-        if (!offsets.isEmpty()) {
-            Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
-            for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
-                offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), now, ""));
-            OffsetCommitRequest req = new OffsetCommitRequest(this.group, this.generation, this.consumerId, offsetData);
-
-            RequestCompletionHandler handler = new RequestCompletionHandler() {
-                public void onComplete(ClientResponse resp) {
-                    if (resp.wasDisconnected()) {
-                        handleDisconnect(resp, time.milliseconds());
-                    } else {
-                        OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody());
-                        for (Map.Entry<TopicPartition, Short> entry : response.responseData().entrySet()) {
-                            TopicPartition tp = entry.getKey();
-                            short errorCode = entry.getValue();
-                            long offset = offsets.get(tp);
-                            if (errorCode == Errors.NONE.code()) {
-                                log.debug("Committed offset {} for partition {}", offset, tp);
-                                subscriptions.committed(tp, offset);
-                            } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                                    || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                                coordinatorDead();
-                            } else {
-                                log.error("Error committing partition {} at offset {}: {}",
-                                          tp,
-                                          offset,
-                                          Errors.forCode(errorCode).exception().getMessage());
-                            }
-                        }
-                    }
-                    metrics.commitLatency.record(resp.requestLatencyMs());
-                }
-            };
-
-            if (commitType == CommitType.ASYNC) {
-                this.initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
-                return;
-            } else {
-                boolean done;
-                do {
-                    ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT,
-                                                                         req.toStruct(),
-                                                                         handler,
-                                                                         now);
 
-                    // check for errors
-                    done = true;
-                    OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody());
-                    for (short errorCode : commitResponse.responseData().values()) {
-                        if (errorCode != Errors.NONE.code())
-                            done = false;
-                    }
-                    if (!done) {
-                        log.debug("Error in offset commit, backing off for {} ms before retrying again.",
-                                  this.retryBackoffMs);
-                        Utils.sleep(this.retryBackoffMs);
-                    }
-                } while (!done);
-            }
-        }
+        // commit the offsets with the coordinator
+        boolean syncCommit = commitType.equals(CommitType.SYNC);
+        if (!syncCommit)
+            this.subscriptions.needRefreshCommits();
+        coordinator.commitOffsets(offsets, syncCommit, now);
     }
 
     /**
@@ -837,7 +711,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 : Arrays.asList(partitions);
         for (TopicPartition tp : parts) {
             // TODO: list offset call could be optimized by grouping by node
-            seek(tp, listOffset(tp, EARLIEST_OFFSET_TIMESTAMP));
+            seek(tp, fetcher.offsetBefore(tp, EARLIEST_OFFSET_TIMESTAMP));
         }
     }
 
@@ -850,7 +724,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 : Arrays.asList(partitions);
         for (TopicPartition tp : parts) {
             // TODO: list offset call could be optimized by grouping by node
-            seek(tp, listOffset(tp, LATEST_OFFSET_TIMESTAMP));
+            seek(tp, fetcher.offsetBefore(tp, LATEST_OFFSET_TIMESTAMP));
         }
     }
 
@@ -868,7 +742,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
         Long offset = this.subscriptions.consumed(partition);
         if (offset == null) {
-            fetchMissingPositionsOrResetThem(Collections.singleton(partition), time.milliseconds());
+            updateFetchPositions(Collections.singleton(partition), time.milliseconds());
             return this.subscriptions.consumed(partition);
         } else {
             return offset;
@@ -899,7 +773,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         } else {
             partitionsToFetch = Collections.singleton(partition);
         }
-        this.refreshCommittedOffsets(time.milliseconds(), partitionsToFetch);
+        refreshCommittedOffsets(partitionsToFetch, time.milliseconds());
         Long committed = this.subscriptions.committed(partition);
         if (committed == null)
             throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
@@ -911,7 +785,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<MetricName, ? extends Metric> metrics() {
-        return Collections.unmodifiableMap(this.metrics.metrics.metrics());
+        return Collections.unmodifiableMap(this.metrics.metrics());
     }
 
     /**
@@ -937,7 +811,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public synchronized void close() {
         log.trace("Closing the Kafka consumer.");
         this.closed = true;
-        this.metrics.metrics.close();
+        this.metrics.close();
         this.client.close();
         log.debug("The Kafka consumer has closed.");
     }
@@ -946,7 +820,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
     }
 
-    /*
+    /**
      * Request a metadata update and wait until it has occurred
      */
     private void awaitMetadataUpdate() {
@@ -957,352 +831,52 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         } while (this.metadata.version() == version);
     }
 
-    /*
-     * Send a join group request to the controller
+    /**
+     * Get partition assignment
      */
-    private void joinGroup(long now) {
-        log.debug("Joining group {}", group);
-
-        // execute the user's callback
+    private void reassignPartitions(long now) {
+        // execute the user's callback before rebalance
+        log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
         try {
-            // TODO: Hmmm, is passing the full Consumer like this actually safe?
-            // Need to think about reentrancy...
             this.rebalanceCallback.onPartitionsRevoked(this, this.subscriptions.assignedPartitions());
         } catch (Exception e) {
             log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
-                    + " failed on partition revocation: ", e);
+                + " failed on partition revocation: ", e);
         }
 
-        // join the group
-        JoinGroupRequest jgr = new JoinGroupRequest(group,
-                                                    (int) this.sessionTimeoutMs,
-                                                    new ArrayList<String>(this.subscriptions.subscribedTopics()),
-                                                    this.consumerId,
-                                                    this.partitionAssignmentStrategy);
-        ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, jgr.toStruct(), null, now);
-        // process the response
-        JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
-        log.debug("Joined group: {}", response);
-        Errors.forCode(response.errorCode()).maybeThrow();
-        this.consumerId = response.consumerId();
-        this.subscriptions.changePartitionAssignment(response.assignedPartitions());
-        this.heartbeat.receivedResponse(now);
+        // get new assigned partitions from the coordinator
+        this.subscriptions.changePartitionAssignment(coordinator.assignPartitions(
+            new ArrayList<String>(this.subscriptions.subscribedTopics()), now));
 
-        // execute the callback
+        // execute the user's callback after rebalance
+        log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
         try {
-            // TODO: Hmmm, is passing the full Consumer like this actually safe?
             this.rebalanceCallback.onPartitionsAssigned(this, this.subscriptions.assignedPartitions());
         } catch (Exception e) {
             log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
-                    + " failed on partition assignment: ", e);
-        }
-
-        // record re-assignment time
-        this.metrics.partitionReassignments.record(time.milliseconds() - now);
-    }
-
-    /*
-     * Empty the record buffer and update the consumed position.
-     */
-    private Map<TopicPartition, List<ConsumerRecord<K, V>>> consumeBufferedRecords() {
-        if (this.subscriptions.needsPartitionAssignment()) {
-            return Collections.emptyMap();
-        } else {
-            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
-            for (PartitionRecords<K, V> part : this.records) {
-                Long consumed = subscriptions.consumed(part.partition);
-                if (this.subscriptions.assignedPartitions().contains(part.partition)
-                        && (consumed == null || part.fetchOffset == consumed)) {
-                    List<ConsumerRecord<K, V>> partRecs = drained.get(part.partition);
-                    if (partRecs == null) {
-                        partRecs = part.records;
-                        drained.put(part.partition, partRecs);
-                    } else {
-                        partRecs.addAll(part.records);
-                    }
-                    subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1);
-                } else {
-                    // these records aren't next in line based on the last consumed position, ignore them
-                    // they must be from an obsolete request
-                    log.debug("Ignoring fetched records for {} at offset {}", part.partition, part.fetchOffset);
-                }
-            }
-            this.records.clear();
-            return drained;
-        }
-    }
-
-    /*
-     * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one
-     */
-    private void reinstateFetches(Cluster cluster, long now) {
-        for (ClientRequest request : createFetchRequests(cluster)) {
-            Node node = cluster.nodeById(request.request().destination());
-            if (client.ready(node, now)) {
-                log.trace("Initiating fetch to node {}: {}", node.id(), request);
-                client.send(request);
-            }
-        }
-    }
-
-    /*
-     * Create fetch requests for all nodes for which we have assigned partitions that have no existing requests in
-     * flight
-     */
-    private List<ClientRequest> createFetchRequests(Cluster cluster) {
-        Map<Integer, Map<TopicPartition, PartitionData>> fetchable = new HashMap<Integer, Map<TopicPartition, PartitionData>>();
-        for (TopicPartition partition : subscriptions.assignedPartitions()) {
-            Node node = cluster.leaderFor(partition);
-            // if there is a leader and no in-flight requests, issue a new fetch
-            if (node != null && this.client.inFlightRequestCount(node.id()) == 0) {
-                Map<TopicPartition, PartitionData> fetch = fetchable.get(node);
-                if (fetch == null) {
-                    fetch = new HashMap<TopicPartition, PartitionData>();
-                    fetchable.put(node.id(), fetch);
-                }
-                long offset = this.subscriptions.fetched(partition);
-                fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
-            }
-        }
-        List<ClientRequest> requests = new ArrayList<ClientRequest>(fetchable.size());
-        for (Map.Entry<Integer, Map<TopicPartition, PartitionData>> entry : fetchable.entrySet()) {
-            int nodeId = entry.getKey();
-            final FetchRequest fetch = new FetchRequest(this.maxWaitMs, minBytes, entry.getValue());
-            RequestSend send = new RequestSend(nodeId, this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct());
-            RequestCompletionHandler handler = new RequestCompletionHandler() {
-                public void onComplete(ClientResponse response) {
-                    handleFetchResponse(response, fetch);
-                }
-            };
-            requests.add(new ClientRequest(time.milliseconds(), true, send, handler));
-        }
-        return requests;
-    }
-
-    private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
-        if (resp.wasDisconnected()) {
-            handleDisconnect(resp, time.milliseconds());
-        } else {
-            int totalBytes = 0;
-            int totalCount = 0;
-            FetchResponse response = new FetchResponse(resp.responseBody());
-            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
-                TopicPartition tp = entry.getKey();
-                FetchResponse.PartitionData partition = entry.getValue();
-                if (!subscriptions.assignedPartitions().contains(tp)) {
-                    log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
-                } else if (partition.errorCode == Errors.NONE.code()) {
-                    ByteBuffer buffer = partition.recordSet;
-                    buffer.position(buffer.limit()); // TODO: arguably we should not have to muck with the position here
-                    MemoryRecords records = MemoryRecords.readableRecords(buffer);
-                    long fetchOffset = request.fetchData().get(tp).offset;
-                    int bytes = 0;
-                    List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
-                    for (LogEntry logEntry : records) {
-                        parsed.add(parseRecord(tp, logEntry));
-                        bytes += logEntry.size();
-                    }
-                    if (parsed.size() > 0) {
-                        ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
-                        this.subscriptions.fetched(tp, record.offset() + 1);
-                        this.metrics.lag.record(partition.highWatermark - record.offset());
-                        this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));
-                    }
-                    this.metrics.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
-                    totalBytes += bytes;
-                    totalCount += parsed.size();
-                } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                        || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()
-                        || partition.errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
-                    this.metadata.requestUpdate();
-                } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
-                    // TODO: this could be optimized by grouping all out-of-range partitions
-                    resetOffset(tp, time.milliseconds());
-                }
-            }
-            this.metrics.bytesFetched.record(totalBytes);
-            this.metrics.recordsFetched.record(totalCount);
-        }
-        this.metrics.fetchLatency.record(resp.requestLatencyMs());
-    }
-
-    private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
-        if (this.checkCrcs)
-            logEntry.record().ensureValid();
-        long offset = logEntry.offset();
-        ByteBuffer keyBytes = logEntry.record().key();
-        K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
-        ByteBuffer valueBytes = logEntry.record().value();
-        V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(),
-                                                                                 Utils.toArray(valueBytes));
-        return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
-    }
-
-    /*
-     * Begin sending a heartbeat to the controller but don't wait for the response
-     */
-    private void initiateHeartbeat(long now) {
-        ensureCoordinatorReady();
-        log.debug("Sending heartbeat to co-ordinator.");
-        HeartbeatRequest req = new HeartbeatRequest(this.group, this.generation, this.consumerId);
-        RequestSend send = new RequestSend(this.consumerCoordinator.id(),
-                                           this.client.nextRequestHeader(ApiKeys.HEARTBEAT),
-                                           req.toStruct());
-
-        RequestCompletionHandler handler = new RequestCompletionHandler() {
-            public void onComplete(ClientResponse resp) {
-                if (resp.wasDisconnected()) {
-                    coordinatorDead();
-                } else {
-                    HeartbeatResponse response = new HeartbeatResponse(resp.responseBody());
-                    if (response.errorCode() == Errors.NONE.code()) {
-                        log.debug("Received successful heartbeat response.");
-                        heartbeat.receivedResponse(time.milliseconds());
-                    } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                            || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                        coordinatorDead();
-                    } else {
-                        throw new KafkaException("Unexpected error in hearbeat response: "
-                                + Errors.forCode(response.errorCode()).exception().getMessage());
-                    }
-                }
-                metrics.heartbeatLatency.record(resp.requestLatencyMs());
-            }
-        };
-        this.client.send(new ClientRequest(now, true, send, handler));
-        this.heartbeat.sentHeartbeat(now);
-    }
-
-    private void coordinatorDead() {
-        log.info("Marking the co-ordinator dead.");
-        heartbeat.markDead();
-        if (subscriptions.partitionsAutoAssigned())
-            subscriptions.clearAssignment();
-        this.consumerCoordinator = null;
-    }
-
-    /*
-     * Initiate a request to the co-ordinator but don't wait for a response.
-     */
-    private void initiateCoordinatorRequest(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
-        log.debug("Issuing co-ordinator request: {}: {}", api, request);
-        ensureCoordinatorReady();
-        RequestHeader header = this.client.nextRequestHeader(api);
-        RequestSend send = new RequestSend(this.consumerCoordinator.id(), header, request);
-        ClientRequest clientRequest = new ClientRequest(now, true, send, handler);
-        this.client.send(clientRequest);
-    }
-
-    /*
-     * Repeatedly attempt to send a request to the co-ordinator until a response is received (retry if we are
-     * disconnected). Note that this means any requests sent this way must be idempotent.
-     * 
-     * @return The response
-     */
-    private ClientResponse blockingCoordinatorRequest(ApiKeys api,
-                                                      Struct request,
-                                                      RequestCompletionHandler handler,
-                                                      long now) {
-        do {
-            initiateCoordinatorRequest(api, request, handler, now);
-            List<ClientResponse> responses = this.client.completeAll(consumerCoordinator.id(), now);
-            if (responses.size() == 0) {
-                throw new IllegalStateException("This should not happen.");
-            } else {
-                ClientResponse response = responses.get(responses.size() - 1);
-                if (response.wasDisconnected()) {
-                    handleDisconnect(response, time.milliseconds());
-                    Utils.sleep(this.retryBackoffMs);
-                } else {
-                    return response;
-                }
-            }
-        } while (true);
-    }
-
-    /*
-     * update the current consumer co-ordinator if needed and ensure we have a ready connection to it
-     */
-    private void ensureCoordinatorReady() {
-        while (true) {
-            if (this.consumerCoordinator == null)
-                discoverCoordinator();
-
-            while (true) {
-                boolean ready = this.client.ready(this.consumerCoordinator, time.milliseconds());
-                if (ready) {
-                    return;
-                } else {
-                    log.debug("No connection to co-ordinator, attempting to connect.");
-                    this.client.poll(this.retryBackoffMs, time.milliseconds());
-                    ConnectionState state = this.client.connectionState(this.consumerCoordinator.id());
-                    if (ConnectionState.DISCONNECTED.equals(state)) {
-                        log.debug("Co-ordinator connection failed. Attempting to re-discover.");
-                        coordinatorDead();
-                        break;
-                    }
-                }
-            }
-        }
-    }
-
-    private void discoverCoordinator() {
-        while (this.consumerCoordinator == null) {
-            log.debug("No consumer co-ordinator known, attempting to discover one.");
-            Node coordinator = fetchConsumerCoordinator();
-
-            if (coordinator == null) {
-                log.debug("No co-ordinator found, backing off.");
-                Utils.sleep(this.retryBackoffMs);
-            } else {
-                log.debug("Found consumer co-ordinator: " + coordinator);
-                this.consumerCoordinator = coordinator;
-            }
+                + " failed on partition assignment: ", e);
         }
     }
 
-    private Node fetchConsumerCoordinator() {
-        // find a node to ask about the co-ordinator
-        Node node = this.client.leastLoadedNode(time.milliseconds());
-        while (node == null || !this.client.ready(node, time.milliseconds())) {
-            long now = time.milliseconds();
-            this.client.poll(this.retryBackoffMs, now);
-            node = this.client.leastLoadedNode(now);
-        }
-
-        // send the metadata request and process all responses
-        long now = time.milliseconds();
-        this.client.send(createConsumerMetadataRequest(now));
-        List<ClientResponse> responses = this.client.completeAll(node.id(), now);
-        if (responses.isEmpty()) {
-            throw new IllegalStateException("This should not happen.");
-        } else {
-            ClientResponse resp = responses.get(responses.size() - 1);
-            if (!resp.wasDisconnected()) {
-                ConsumerMetadataResponse response = new ConsumerMetadataResponse(resp.responseBody());
-                if (response.errorCode() == Errors.NONE.code())
-                    return new Node(Integer.MAX_VALUE - response.node().id(), response.node().host(), response.node().port());
-            }
-        }
-        return null;
-    }
-
     /**
-     * Update our cache of committed positions and then set the fetch position to the committed position (if there is
-     * one) or reset it using the offset reset policy the user has configured.
-     * 
-     * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
+     * Set the fetch position to the committed position (if there is one)
+     * or reset it using the offset reset policy the user has configured.
+     *
+     * @param partitions The partitions that needs updating fetch positions
+     * @param now The current time
+     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
      *             defined
      */
-    private void fetchMissingPositionsOrResetThem(Set<TopicPartition> partitions, long now) {
-        // update the set of committed offsets
-        refreshCommittedOffsets(now, partitions);
+    private void updateFetchPositions(Set<TopicPartition> partitions, long now) {
+        // first refresh the committed positions in case they are not up-to-date
+        refreshCommittedOffsets(partitions, now);
 
-        // reset the fetch position to the committed poisition
+        // reset the fetch position to the committed position
         for (TopicPartition tp : partitions) {
             if (subscriptions.fetched(tp) == null) {
                 if (subscriptions.committed(tp) == null) {
-                    resetOffset(tp, now);
+                    // if the committed position is unknown reset the position
+                    fetcher.resetOffset(tp);
                 } else {
                     log.debug("Resetting offset for partition {} to committed offset");
                     subscriptions.seek(tp, subscriptions.committed(tp));
@@ -1311,289 +885,30 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
-    /*
-     * Fetch the given set of partitions and update the cache of committed offsets using the result
-     */
-    private void refreshCommittedOffsets(long now, Set<TopicPartition> partitions) {
-        log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
-        OffsetFetchRequest request = new OffsetFetchRequest(this.group, new ArrayList<TopicPartition>(partitions));
-        ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.OFFSET_FETCH, request.toStruct(), null, now);
-        OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
-        for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
-            TopicPartition tp = entry.getKey();
-            OffsetFetchResponse.PartitionData data = entry.getValue();
-            if (data.hasError()) {
-                log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
-                                                                                        .exception()
-                                                                                        .getMessage());
-            } else if (data.offset >= 0) {
-                // update the position with the offset (-1 seems to indicate no
-                // such offset known)
-                this.subscriptions.committed(tp, data.offset);
-            } else {
-                log.debug("No committed offset for partition " + tp);
-            }
-        }
-    }
-
-    /*
-     * Fetch a single offset before the given timestamp for the partition.
-     */
-    private long listOffset(TopicPartition tp, long ts) {
-        log.debug("Fetching offsets for partition {}.", tp);
-        Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
-        partitions.put(tp, new ListOffsetRequest.PartitionData(ts, 1));
-        while (true) {
-            long now = time.milliseconds();
-            PartitionInfo info = metadata.fetch().partition(tp);
-            if (info == null) {
-                metadata.add(tp.topic());
-                awaitMetadataUpdate();
-            } else if (info.leader() == null) {
-                awaitMetadataUpdate();
-            } else if (this.client.ready(info.leader(), now)) {
-                Node node = info.leader();
-                ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
-                RequestSend send = new RequestSend(node.id(),
-                                                   this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
-                                                   request.toStruct());
-                ClientRequest clientRequest = new ClientRequest(now, true, send, null);
-                this.client.send(clientRequest);
-                List<ClientResponse> responses = this.client.completeAll(node.id(), now);
-                if (responses.isEmpty())
-                    throw new IllegalStateException("This should not happen.");
-                ClientResponse response = responses.get(responses.size() - 1);
-                if (response.wasDisconnected()) {
-                    awaitMetadataUpdate();
-                } else {
-                    ListOffsetResponse lor = new ListOffsetResponse(response.responseBody());
-                    short errorCode = lor.responseData().get(tp).errorCode;
-                    if (errorCode == Errors.NONE.code()) {
-                        List<Long> offsets = lor.responseData().get(tp).offsets;
-                        if (offsets.size() != 1)
-                            throw new IllegalStateException("This should not happen.");
-                        return offsets.get(0);
-                    } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                            || errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
-                        log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
-                                 tp);
-                        awaitMetadataUpdate();
-                        continue;
-                    } else {
-                        Errors.forCode(errorCode).maybeThrow();
-                    }
-                }
-            } else {
-                client.poll(this.retryBackoffMs, now);
+    /**
+     * Refresh the committed offsets for given set of partitions and update the cache
+     */
+    private void refreshCommittedOffsets(Set<TopicPartition> partitions, long now) {
+        // we only need to fetch latest committed offset from coordinator if there
+        // is some commit process in progress, otherwise our current
+        // committed cache is up-to-date
+        if (subscriptions.refreshCommitsNeeded()) {
+            // contact coordinator to fetch committed offsets
+            Map<TopicPartition, Long> offsets = coordinator.fetchOffsets(partitions, now);
+
+            // update the position with the offsets
+            for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                this.subscriptions.committed(tp, entry.getValue());
             }
         }
     }
 
     /*
-     * Create a consumer metadata request for the given group
-     */
-    private ClientRequest createConsumerMetadataRequest(long now) {
-        ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.group);
-        Node destination = this.client.leastLoadedNode(now);
-        if (destination == null) // all nodes are blacked out
-            return null;
-        RequestSend send = new RequestSend(destination.id(),
-                                           this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA),
-                                           request.toStruct());
-        ClientRequest consumerMetadataRequest = new ClientRequest(now, true, send, null);
-        return consumerMetadataRequest;
-    }
-
-    /**
-     * Reset offsets for the given partition using the offset reset strategy
-     * 
-     * @throws NoOffsetForPartitionException If no offset reset strategy is defined
-     */
-    private void resetOffset(TopicPartition partition, long now) {
-        long timestamp;
-        if (this.offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST)
-            timestamp = EARLIEST_OFFSET_TIMESTAMP;
-        else if (this.offsetResetStrategy == AutoOffsetResetStrategy.LATEST)
-            timestamp = LATEST_OFFSET_TIMESTAMP;
-        else
-            throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
-
-        log.debug("Resetting offset for partition {} to {} offset.", partition, this.offsetResetStrategy.name()
-                                                                                                        .toLowerCase());
-        this.subscriptions.seek(partition, listOffset(partition, timestamp));
-    }
-
-    private void handleDisconnect(ClientResponse response, long now) {
-        int correlation = response.request().request().header().correlationId();
-        log.debug("Cancelled request {} with correlation id {} due to node {} being disconnected",
-                  response.request(),
-                  correlation,
-                  response.request().request().destination());
-        if (this.consumerCoordinator != null
-                && response.request().request().destination() == this.consumerCoordinator.id())
-            coordinatorDead();
-    }
-
-    /*
      * Check that the consumer hasn't been closed.
      */
     private void ensureNotClosed() {
         if (this.closed)
             throw new IllegalStateException("This consumer has already been closed.");
     }
-
-    private static class PartitionRecords<K, V> {
-        public long fetchOffset;
-        public TopicPartition partition;
-        public List<ConsumerRecord<K, V>> records;
-
-        public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
-            this.fetchOffset = fetchOffset;
-            this.partition = partition;
-            this.records = records;
-        }
-    }
-
-    private static enum AutoOffsetResetStrategy {
-        LATEST, EARLIEST, NONE;
-    }
-
-    private class ConsumerMetrics {
-        public final Metrics metrics;
-        public final Sensor bytesFetched;
-        public final Sensor recordsFetched;
-        public final Sensor fetchLatency;
-        public final Sensor commitLatency;
-        public final Sensor partitionReassignments;
-        public final Sensor heartbeatLatency;
-        public final Sensor lag;
-
-        public ConsumerMetrics(Metrics metrics, String metricsGroup, Map<String, String> tags) {
-            this.metrics = metrics;
-
-            this.bytesFetched = metrics.sensor("bytes-fetched");
-            this.bytesFetched.add(new MetricName("fetch-size-avg",
-                                                 metricsGroup,
-                                                 "The average number of bytes fetched per request",
-                                                 tags), new Avg());
-            this.bytesFetched.add(new MetricName("fetch-size-max",
-                                                 metricsGroup,
-                                                 "The maximum number of bytes fetched per request",
-                                                 tags), new Max());
-            this.bytesFetched.add(new MetricName("bytes-consumed-rate",
-                                                 metricsGroup,
-                                                 "The average number of bytes consumed per second",
-                                                 tags), new Rate());
-
-            this.recordsFetched = metrics.sensor("records-fetched");
-            this.recordsFetched.add(new MetricName("records-per-request-avg",
-                                                   metricsGroup,
-                                                   "The average number of records in each request",
-                                                   tags), new Avg());
-            this.recordsFetched.add(new MetricName("records-consumed-rate",
-                                                   metricsGroup,
-                                                   "The average number of records consumed per second",
-                                                   tags), new Rate());
-
-            this.fetchLatency = metrics.sensor("fetch-latency");
-            this.fetchLatency.add(new MetricName("fetch-latency-avg",
-                                                 metricsGroup,
-                                                 "The average time taken for a fetch request.",
-                                                 tags), new Avg());
-            this.fetchLatency.add(new MetricName("fetch-latency-max",
-                                                 metricsGroup,
-                                                 "The max time taken for any fetch request.",
-                                                 tags), new Max());
-            this.fetchLatency.add(new MetricName("fetch-rate",
-                                                 metricsGroup,
-                                                 "The number of fetch requests per second.",
-                                                 tags), new Rate(new Count()));
-
-            this.commitLatency = metrics.sensor("commit-latency");
-            this.commitLatency.add(new MetricName("commit-latency-avg",
-                                                  metricsGroup,
-                                                  "The average time taken for a commit request",
-                                                  tags), new Avg());
-            this.commitLatency.add(new MetricName("commit-latency-max",
-                                                  metricsGroup,
-                                                  "The max time taken for a commit request",
-                                                  tags), new Max());
-            this.commitLatency.add(new MetricName("commit-rate",
-                                                  metricsGroup,
-                                                  "The number of commit calls per second",
-                                                  tags), new Rate(new Count()));
-
-            this.partitionReassignments = metrics.sensor("reassignment-latency");
-            this.partitionReassignments.add(new MetricName("reassignment-time-avg",
-                                                           metricsGroup,
-                                                           "The average time taken for a partition reassignment",
-                                                           tags), new Avg());
-            this.partitionReassignments.add(new MetricName("reassignment-time-max",
-                                                           metricsGroup,
-                                                           "The max time taken for a partition reassignment",
-                                                           tags), new Avg());
-            this.partitionReassignments.add(new MetricName("reassignment-rate",
-                                                           metricsGroup,
-                                                           "The number of partition reassignments per second",
-                                                           tags), new Rate(new Count()));
-
-            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
-            this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
-                                                     metricsGroup,
-                                                     "The max time taken to receive a response to a hearbeat request",
-                                                     tags), new Max());
-            this.heartbeatLatency.add(new MetricName("heartbeat-rate",
-                                                     metricsGroup,
-                                                     "The average number of heartbeats per second",
-                                                     tags), new Rate(new Count()));
-
-            this.lag = metrics.sensor("lag");
-            this.lag.add(new MetricName("lag-max",
-                                        metricsGroup,
-                                        "The maximum lag for any partition in this window",
-                                        tags), new Max());
-
-            Measurable numParts = 
-                new Measurable() {
-                    public double measure(MetricConfig config, long now) {
-                        return subscriptions.assignedPartitions().size();
-                    }
-                };
-            metrics.addMetric(new MetricName("assigned-partitions",
-                                             metricsGroup,
-                                             "The number of partitions currently assigned to this consumer",
-                                             tags),
-                              numParts);
-                              
-            
-            Measurable lastHeartbeat =                               
-                new Measurable() {
-                    public double measure(MetricConfig config, long now) {
-                        return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
-                    }
-                };
-            metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
-                                             metricsGroup,
-                                             "The number of seconds since the last controller heartbeat",
-                                             tags), 
-                                             
-                              lastHeartbeat);
-        }
-
-        public void recordTopicFetchMetrics(String topic, int bytes, int records) {
-            // record bytes fetched
-            String name = "topic." + topic + ".bytes-fetched";
-            Sensor bytesFetched = this.metrics.getSensor(name);
-            if (bytesFetched == null)
-                bytesFetched = this.metrics.sensor(name);
-            bytesFetched.record(bytes);
-
-            // record records fetched
-            name = "topic." + topic + ".records-fetched";
-            Sensor recordsFetched = this.metrics.getSensor(name);
-            if (recordsFetched == null)
-                recordsFetched = this.metrics.sensor(name);
-            recordsFetched.record(bytes);
-        }
-    }
 }


Mime
View raw message