kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/2] kafka git commit: KAFKA-2440; Use `NetworkClient` instead of `SimpleConsumer` to fetch data from replica
Date Fri, 11 Sep 2015 23:08:12 GMT
KAFKA-2440; Use `NetworkClient` instead of `SimpleConsumer` to fetch data from replica

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Aditya Auradkar <aauradkar@linkedin.com>, Jun Rao <junrao@gmail.com>

Closes #194 from ijuma/kafka-2440-use-network-client-in-fetcher


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65bf3afe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65bf3afe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65bf3afe

Branch: refs/heads/trunk
Commit: 65bf3afe86ef883136fcd6bc857724b25e750bd7
Parents: 6c1957d
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri Sep 11 16:08:00 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Sep 11 16:08:00 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |   3 +-
 .../clients/consumer/internals/Fetcher.java     |   6 +-
 .../kafka/common/metrics/JmxReporter.java       |  26 ++
 .../apache/kafka/common/metrics/Metrics.java    |  65 +++-
 .../kafka/common/metrics/MetricsReporter.java   |   6 +
 .../apache/kafka/common/network/Selector.java   |  43 ++-
 .../common/requests/ListOffsetRequest.java      |   3 +
 .../clients/consumer/internals/FetcherTest.java |  10 +-
 .../common/metrics/FakeMetricsReporter.java     |   5 +-
 .../kafka/common/metrics/MetricsTest.java       |  67 ++++-
 .../apache/kafka/test/MockMetricsReporter.java  |   5 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |  90 ++++--
 .../controller/ControllerChannelManager.scala   |   6 +-
 .../kafka/controller/KafkaController.scala      |  20 --
 .../kafka/server/AbstractFetcherThread.scala    | 212 +++++++------
 .../main/scala/kafka/server/KafkaServer.scala   |   3 +-
 .../kafka/server/ReplicaFetcherManager.scala    |   9 +-
 .../kafka/server/ReplicaFetcherThread.scala     | 171 +++++++++--
 .../scala/kafka/server/ReplicaManager.scala     |   8 +-
 .../integration/BaseTopicMetadataTest.scala     | 295 +++++++++++++++++++
 .../PlaintextTopicMetadataTest.scala            |  23 ++
 .../integration/SslTopicMetadataTest.scala      |  24 ++
 .../kafka/integration/TopicMetadataTest.scala   | 291 ------------------
 .../kafka/server/BaseReplicaFetchTest.scala     |  84 ++++++
 .../server/HighwatermarkPersistenceTest.scala   |   8 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |   8 +-
 .../server/PlaintextReplicaFetchTest.scala      |  22 ++
 .../unit/kafka/server/ReplicaFetchTest.scala    |  79 -----
 .../unit/kafka/server/ReplicaManagerTest.scala  |  14 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   7 +-
 .../unit/kafka/server/SslReplicaFetchTest.scala |  24 ++
 31 files changed, 1054 insertions(+), 583 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 1302f35..51a6c5d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -153,7 +153,8 @@ public class NetworkClient implements KafkaClient {
     @Override
     public void close(String nodeId) {
         selector.close(nodeId);
-        inFlightRequests.clearAll(nodeId);
+        for (ClientRequest request : inFlightRequests.clearAll(nodeId))
+            metadataUpdater.maybeHandleDisconnection(request);
         connectionStates.remove(nodeId);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index a797c79..0efd34d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -61,8 +61,6 @@ import java.util.Set;
  * This class manage the fetching process with the brokers.
  */
 public class Fetcher<K, V> {
-    public static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
-    public static final long LATEST_OFFSET_TIMESTAMP = -1L;
 
     private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
 
@@ -220,9 +218,9 @@ public class Fetcher<K, V> {
         OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
         final long timestamp;
         if (strategy == OffsetResetStrategy.EARLIEST)
-            timestamp = EARLIEST_OFFSET_TIMESTAMP;
+            timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
         else if (strategy == OffsetResetStrategy.LATEST)
-            timestamp = LATEST_OFFSET_TIMESTAMP;
+            timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
         else
             throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 6b9590c..6872049 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -78,6 +78,28 @@ public class JmxReporter implements MetricsReporter {
         }
     }
 
+    @Override
+    public void metricRemoval(KafkaMetric metric) {
+        synchronized (LOCK) {
+            KafkaMbean mbean = removeAttribute(metric);
+            if (mbean != null) {
+                if (mbean.metrics.isEmpty())
+                    unregister(mbean);
+                else
+                    reregister(mbean);
+            }
+        }
+    }
+
+    private KafkaMbean removeAttribute(KafkaMetric metric) {
+        MetricName metricName = metric.metricName();
+        String mBeanName = getMBeanName(metricName);
+        KafkaMbean mbean = this.mbeans.get(mBeanName);
+        if (mbean != null)
+            mbean.removeAttribute(metricName.name());
+        return mbean;
+    }
+
     private KafkaMbean addAttribute(KafkaMetric metric) {
         try {
             MetricName metricName = metric.metricName();
@@ -176,6 +198,10 @@ public class JmxReporter implements MetricsReporter {
             }
         }
 
+        public KafkaMetric removeAttribute(String name) {
+            return this.metrics.remove(name);
+        }
+
         @Override
         public MBeanInfo getMBeanInfo() {
             MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()];

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 5f6caf9..42936a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.metrics;
 
 import java.io.Closeable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -52,6 +53,7 @@ public class Metrics implements Closeable {
     private final MetricConfig config;
     private final ConcurrentMap<MetricName, KafkaMetric> metrics;
     private final ConcurrentMap<String, Sensor> sensors;
+    private final ConcurrentMap<Sensor, List<Sensor>> childrenSensors;
     private final List<MetricsReporter> reporters;
     private final Time time;
 
@@ -86,8 +88,9 @@ public class Metrics implements Closeable {
      */
     public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
         this.config = defaultConfig;
-        this.sensors = new CopyOnWriteMap<String, Sensor>();
-        this.metrics = new CopyOnWriteMap<MetricName, KafkaMetric>();
+        this.sensors = new CopyOnWriteMap<>();
+        this.metrics = new CopyOnWriteMap<>();
+        this.childrenSensors = new CopyOnWriteMap<>();
         this.reporters = Utils.notNull(reporters);
         this.time = time;
         for (MetricsReporter reporter : reporters)
@@ -136,11 +139,46 @@ public class Metrics implements Closeable {
         if (s == null) {
             s = new Sensor(this, name, parents, config == null ? this.config : config, time);
             this.sensors.put(name, s);
+            if (parents != null) {
+                for (Sensor parent : parents) {
+                    List<Sensor> children = childrenSensors.get(parent.name());
+                    if (children == null) {
+                        children = new ArrayList<>();
+                        childrenSensors.put(parent, children);
+                    }
+                    children.add(s);
+                }
+            }
         }
         return s;
     }
 
     /**
+     * Remove a sensor (if it exists), associated metrics and its children.
+     *
+     * @param name The name of the sensor to be removed
+     */
+    public void removeSensor(String name) {
+        Sensor sensor = sensors.get(name);
+        if (sensor != null) {
+            List<Sensor> childSensors = null;
+            synchronized (sensor) {
+                synchronized (this) {
+                    if (sensors.remove(name, sensor)) {
+                        for (KafkaMetric metric : sensor.metrics())
+                            removeMetric(metric.metricName());
+                        childSensors = childrenSensors.remove(sensor);
+                    }
+                }
+            }
+            if (childSensors != null) {
+                for (Sensor childSensor : childSensors)
+                    removeSensor(childSensor.name());
+            }
+        }
+    }
+
+    /**
      * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
      * This is a way to expose existing values as metrics.
      * @param metricName The name of the metric
@@ -167,10 +205,26 @@ public class Metrics implements Closeable {
     }
 
     /**
+     * Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval`
+     * will be invoked for each reporter.
+     *
+     * @param metricName The name of the metric
+     * @return the removed `KafkaMetric` or null if no such metric exists
+     */
+    public synchronized KafkaMetric removeMetric(MetricName metricName) {
+        KafkaMetric metric = this.metrics.remove(metricName);
+        if (metric != null) {
+            for (MetricsReporter reporter : reporters)
+                reporter.metricRemoval(metric);
+        }
+        return metric;
+    }
+
+    /**
      * Add a MetricReporter
      */
     public synchronized void addReporter(MetricsReporter reporter) {
-        Utils.notNull(reporter).init(new ArrayList<KafkaMetric>(metrics.values()));
+        Utils.notNull(reporter).init(new ArrayList<>(metrics.values()));
         this.reporters.add(reporter);
     }
 
@@ -190,6 +244,11 @@ public class Metrics implements Closeable {
         return this.metrics;
     }
 
+    /* For testing use only. */
+    Map<Sensor, List<Sensor>> childrenSensors() {
+        return Collections.unmodifiableMap(childrenSensors);
+    }
+
     /**
      * Close this metrics repository.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index 7acc19e..e2a1d80 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -34,6 +34,12 @@ public interface MetricsReporter extends Configurable {
     public void metricChange(KafkaMetric metric);
 
     /**
+     * This is called whenever a metric is removed
+     * @param metric
+     */
+    public void metricRemoval(KafkaMetric metric);
+
+    /**
      * Called when the metrics repository is closed.
      */
     public void close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 4aa5cbb..e1c6866 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -202,8 +202,8 @@ public class Selector implements Selectable {
      */
     @Override
     public void close() {
-        List<String> connections = new LinkedList<String>(channels.keySet());
-        for (String id: connections)
+        List<String> connections = new ArrayList<>(channels.keySet());
+        for (String id : connections)
             close(id);
         try {
             this.nioSelector.close();
@@ -212,6 +212,7 @@ public class Selector implements Selectable {
         } catch (SecurityException se) {
             log.error("Exception closing nioSelector:", se);
         }
+        sensors.close();
     }
 
     /**
@@ -558,6 +559,10 @@ public class Selector implements Selectable {
         public final Sensor selectTime;
         public final Sensor ioTime;
 
+        /* Names of metrics that are not registered through sensors */
+        private final List<MetricName> topLevelMetricNames = new ArrayList<>();
+        private final List<Sensor> sensors = new ArrayList<>();
+
         public SelectorMetrics(Metrics metrics) {
             this.metrics = metrics;
             String metricGrpName = metricGrpPrefix + "-metrics";
@@ -569,19 +574,19 @@ public class Selector implements Selectable {
                 tagsSuffix.append(tag.getValue());
             }
 
-            this.connectionClosed = this.metrics.sensor("connections-closed:" + tagsSuffix.toString());
+            this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
             MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
             this.connectionClosed.add(metricName, new Rate());
 
-            this.connectionCreated = this.metrics.sensor("connections-created:" + tagsSuffix.toString());
+            this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
             metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
             this.connectionCreated.add(metricName, new Rate());
 
-            this.bytesTransferred = this.metrics.sensor("bytes-sent-received:" + tagsSuffix.toString());
+            this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
             metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
             bytesTransferred.add(metricName, new Rate(new Count()));
 
-            this.bytesSent = this.metrics.sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
+            this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
             metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
             this.bytesSent.add(metricName, new Rate());
             metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
@@ -591,13 +596,13 @@ public class Selector implements Selectable {
             metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
             this.bytesSent.add(metricName, new Max());
 
-            this.bytesReceived = this.metrics.sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
+            this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
             metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
             this.bytesReceived.add(metricName, new Rate());
             metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
             this.bytesReceived.add(metricName, new Rate(new Count()));
 
-            this.selectTime = this.metrics.sensor("select-time:" + tagsSuffix.toString());
+            this.selectTime = sensor("select-time:" + tagsSuffix.toString());
             metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
             this.selectTime.add(metricName, new Rate(new Count()));
             metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
@@ -605,13 +610,14 @@ public class Selector implements Selectable {
             metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
             this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
 
-            this.ioTime = this.metrics.sensor("io-time:" + tagsSuffix.toString());
+            this.ioTime = sensor("io-time:" + tagsSuffix.toString());
             metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
             this.ioTime.add(metricName, new Avg());
             metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
             this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
 
             metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
+            topLevelMetricNames.add(metricName);
             this.metrics.addMetric(metricName, new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return channels.size();
@@ -619,6 +625,12 @@ public class Selector implements Selectable {
             });
         }
 
+        private Sensor sensor(String name, Sensor... parents) {
+            Sensor sensor = metrics.sensor(name, parents);
+            sensors.add(sensor);
+            return sensor;
+        }
+
         public void maybeRegisterConnectionMetrics(String connectionId) {
             if (!connectionId.isEmpty() && metricsPerConnection) {
                 // if one sensor of the metrics has been registered for the connection,
@@ -631,7 +643,7 @@ public class Selector implements Selectable {
                     Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
                     tags.put("node-id", "node-" + connectionId);
 
-                    nodeRequest = this.metrics.sensor(nodeRequestName);
+                    nodeRequest = sensor(nodeRequestName);
                     MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
                     nodeRequest.add(metricName, new Rate());
                     metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
@@ -642,14 +654,14 @@ public class Selector implements Selectable {
                     nodeRequest.add(metricName, new Max());
 
                     String nodeResponseName = "node-" + connectionId + ".bytes-received";
-                    Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
+                    Sensor nodeResponse = sensor(nodeResponseName);
                     metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
                     nodeResponse.add(metricName, new Rate());
                     metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
                     nodeResponse.add(metricName, new Rate(new Count()));
 
                     String nodeTimeName = "node-" + connectionId + ".latency";
-                    Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
+                    Sensor nodeRequestTime = sensor(nodeTimeName);
                     metricName = new MetricName("request-latency-avg", metricGrpName, tags);
                     nodeRequestTime.add(metricName, new Avg());
                     metricName = new MetricName("request-latency-max", metricGrpName, tags);
@@ -679,6 +691,13 @@ public class Selector implements Selectable {
                     nodeRequest.record(bytes, now);
             }
         }
+
+        public void close() {
+            for (MetricName metricName : topLevelMetricNames)
+                metrics.removeMetric(metricName);
+            for (Sensor sensor : sensors)
+                metrics.removeSensor(sensor.name());
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 6da4a0e..8dfd811 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -31,6 +31,9 @@ import java.util.List;
 import java.util.Map;
 
 public class ListOffsetRequest extends AbstractRequest {
+
+    public static final long EARLIEST_TIMESTAMP = -2L;
+    public static final long LATEST_TIMESTAMP = -1L;
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
     private static final String REPLICA_ID_KEY_NAME = "replica_id";

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index f2a8381..d79a10e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -239,7 +239,7 @@ public class FetcherTest {
         subscriptions.assign(Arrays.asList(tp));
         // with no commit position, we should reset using the default strategy defined above (EARLIEST)
 
-        client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
@@ -253,7 +253,7 @@ public class FetcherTest {
         subscriptions.assign(Arrays.asList(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
 
-        client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
@@ -267,7 +267,7 @@ public class FetcherTest {
         subscriptions.assign(Arrays.asList(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
 
-        client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
@@ -282,11 +282,11 @@ public class FetcherTest {
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
 
         // First request gets a disconnect
-        client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);
 
         // Next one succeeds
-        client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
index 7c7ead1..d5dd9b8 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
@@ -27,6 +27,9 @@ public class FakeMetricsReporter implements MetricsReporter {
     public void metricChange(KafkaMetric metric) {}
 
     @Override
+    public void metricRemoval(KafkaMetric metric) {}
+
+    @Override
     public void close() {}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 0a7dcd8..9096ef7 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -13,6 +13,8 @@
 package org.apache.kafka.common.metrics;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 import java.util.Arrays;
@@ -134,14 +136,14 @@ public class MetricsTest {
 
         /* each metric should have a count equal to one + its children's count */
         assertEquals(1.0, gc, EPS);
-        assertEquals(1.0 + gc, child1.metrics().get(0).value(), EPS);
+        assertEquals(1.0 + gc, c1, EPS);
         assertEquals(1.0, c2, EPS);
         assertEquals(1.0 + c1, p2, EPS);
         assertEquals(1.0 + c1 + c2, p1, EPS);
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testBadSensorHiearchy() {
+    public void testBadSensorHierarchy() {
         Sensor p = metrics.sensor("parent");
         Sensor c1 = metrics.sensor("child1", p);
         Sensor c2 = metrics.sensor("child2", p);
@@ -149,6 +151,67 @@ public class MetricsTest {
     }
 
     @Test
+    public void testRemoveSensor() {
+        Sensor parent1 = metrics.sensor("test.parent1");
+        parent1.add(new MetricName("test.parent1.count", "grp1"), new Count());
+        Sensor parent2 = metrics.sensor("test.parent2");
+        parent2.add(new MetricName("test.parent2.count", "grp1"), new Count());
+        Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
+        child1.add(new MetricName("test.child1.count", "grp1"), new Count());
+        Sensor child2 = metrics.sensor("test.child2", parent2);
+        child2.add(new MetricName("test.child2.count", "grp1"), new Count());
+        Sensor grandChild1 = metrics.sensor("test.gchild2", child2);
+        grandChild1.add(new MetricName("test.gchild2.count", "grp1"), new Count());
+
+        Sensor sensor = metrics.getSensor("test.parent1");
+        assertNotNull(sensor);
+        metrics.removeSensor("test.parent1");
+        assertNull(metrics.getSensor("test.parent1"));
+        assertNull(metrics.metrics().get(new MetricName("test.parent1.count", "grp1")));
+        assertNull(metrics.getSensor("test.child1"));
+        assertNull(metrics.childrenSensors().get(sensor));
+        assertNull(metrics.metrics().get(new MetricName("test.child1.count", "grp1")));
+
+        sensor = metrics.getSensor("test.gchild2");
+        assertNotNull(sensor);
+        metrics.removeSensor("test.gchild2");
+        assertNull(metrics.getSensor("test.gchild2"));
+        assertNull(metrics.childrenSensors().get(sensor));
+        assertNull(metrics.metrics().get(new MetricName("test.gchild2.count", "grp1")));
+
+        sensor = metrics.getSensor("test.child2");
+        assertNotNull(sensor);
+        metrics.removeSensor("test.child2");
+        assertNull(metrics.getSensor("test.child2"));
+        assertNull(metrics.childrenSensors().get(sensor));
+        assertNull(metrics.metrics().get(new MetricName("test.child2.count", "grp1")));
+
+        sensor = metrics.getSensor("test.parent2");
+        assertNotNull(sensor);
+        metrics.removeSensor("test.parent2");
+        assertNull(metrics.getSensor("test.parent2"));
+        assertNull(metrics.childrenSensors().get(sensor));
+        assertNull(metrics.metrics().get(new MetricName("test.parent2.count", "grp1")));
+
+        assertEquals(0, metrics.metrics().size());
+    }
+
+    @Test
+    public void testRemoveMetric() {
+        metrics.addMetric(new MetricName("test1", "grp1"), new Count());
+        metrics.addMetric(new MetricName("test2", "grp1"), new Count());
+
+        assertNotNull(metrics.removeMetric(new MetricName("test1", "grp1")));
+        assertNull(metrics.metrics().get(new MetricName("test1", "grp1")));
+        assertNotNull(metrics.metrics().get(new MetricName("test2", "grp1")));
+
+        assertNotNull(metrics.removeMetric(new MetricName("test2", "grp1")));
+        assertNull(metrics.metrics().get(new MetricName("test2", "grp1")));
+
+        assertEquals(0, metrics.metrics().size());
+    }
+
+    @Test
     public void testEventWindowing() {
         Count count = new Count();
         MetricConfig config = new MetricConfig().eventWindow(1).samples(2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
index 2a8fa1f..de9fcd0 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
@@ -37,9 +37,10 @@ public class MockMetricsReporter implements MetricsReporter {
     }
 
     @Override
-    public void metricChange(KafkaMetric metric) {
+    public void metricChange(KafkaMetric metric) {}
 
-    }
+    @Override
+    public void metricRemoval(KafkaMetric metric) {}
 
     @Override
     public void close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 33ea728..8801ff8 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -17,12 +17,14 @@
 
 package kafka.consumer
 
+import kafka.api.{OffsetRequest, Request, FetchRequestBuilder, FetchResponsePartitionData}
 import kafka.cluster.BrokerEndPoint
-import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData}
-import kafka.common.TopicAndPartition
-
+import kafka.server.{PartitionFetchState, AbstractFetcherThread}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import scala.collection.JavaConverters
+import JavaConverters._
+import ConsumerFetcherThread._
 
 class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
@@ -32,31 +34,52 @@ class ConsumerFetcherThread(name: String,
         extends AbstractFetcherThread(name = name,
                                       clientId = config.clientId,
                                       sourceBroker = sourceBroker,
-                                      socketTimeout = config.socketTimeoutMs,
-                                      socketBufferSize = config.socketReceiveBufferBytes,
-                                      fetchSize = config.fetchMessageMaxBytes,
-                                      fetcherBrokerId = Request.OrdinaryConsumerId,
-                                      maxWait = config.fetchWaitMaxMs,
-                                      minBytes = config.fetchMinBytes,
                                       fetchBackOffMs = config.refreshLeaderBackoffMs,
                                       isInterruptible = true) {
 
+  type REQ = FetchRequest
+  type PD = PartitionData
+
+  private val clientId = config.clientId
+  private val fetchSize = config.fetchMessageMaxBytes
+
+  private val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, config.socketTimeoutMs,
+    config.socketReceiveBufferBytes, config.clientId)
+
+  private val fetchRequestBuilder = new FetchRequestBuilder().
+    clientId(clientId).
+    replicaId(Request.OrdinaryConsumerId).
+    maxWait(config.fetchWaitMaxMs).
+    minBytes(config.fetchMinBytes).
+    requestVersion(kafka.api.FetchRequest.CurrentVersion)
+
+  override def initiateShutdown(): Boolean = {
+    val justShutdown = super.initiateShutdown()
+    if (justShutdown && isInterruptible)
+      simpleConsumer.disconnectToHandleJavaIOBug()
+    justShutdown
+  }
+
+  override def shutdown(): Unit = {
+    super.shutdown()
+    simpleConsumer.close()
+  }
+
   // process fetched data
-  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
+  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) {
     val pti = partitionMap(topicAndPartition)
     if (pti.getFetchOffset != fetchOffset)
       throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
                                 .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
-    pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+    pti.enqueue(partitionData.underlying.messages.asInstanceOf[ByteBufferMessageSet])
   }
 
   // handle a partition whose offset is out of range and return a new fetch offset
   def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
-    var startTimestamp : Long = 0
-    config.autoOffsetReset match {
-      case OffsetRequest.SmallestTimeString => startTimestamp = OffsetRequest.EarliestTime
-      case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime
-      case _ => startTimestamp = OffsetRequest.LatestTime
+    val startTimestamp = config.autoOffsetReset match {
+      case OffsetRequest.SmallestTimeString => OffsetRequest.EarliestTime
+      case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime
+      case _ => OffsetRequest.LatestTime
     }
     val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId)
     val pti = partitionMap(topicAndPartition)
@@ -70,4 +93,37 @@ class ConsumerFetcherThread(name: String,
     removePartitions(partitions.toSet)
     consumerFetcherManager.addPartitionsWithError(partitions)
   }
+
+  protected def buildFetchRequest(partitionMap: collection.Map[TopicAndPartition, PartitionFetchState]): FetchRequest = {
+    partitionMap.foreach { case ((topicAndPartition, partitionFetchState)) =>
+      if (partitionFetchState.isActive)
+        fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, partitionFetchState.offset,
+          fetchSize)
+    }
+
+    new FetchRequest(fetchRequestBuilder.build())
+  }
+
+  protected def fetch(fetchRequest: FetchRequest): collection.Map[TopicAndPartition, PartitionData] =
+    simpleConsumer.fetch(fetchRequest.underlying).data.map { case (key, value) =>
+      key -> new PartitionData(value)
+    }
+
+}
+
+object ConsumerFetcherThread {
+
+  class FetchRequest(val underlying: kafka.api.FetchRequest) extends AbstractFetcherThread.FetchRequest {
+    def isEmpty: Boolean = underlying.requestInfo.isEmpty
+    def offset(topicAndPartition: TopicAndPartition): Long = underlying.requestInfo(topicAndPartition).offset
+  }
+
+  class PartitionData(val underlying: FetchResponsePartitionData) extends AbstractFetcherThread.PartitionData {
+    def errorCode: Short = underlying.error
+    def toByteBufferMessageSet: ByteBufferMessageSet = underlying.messages.asInstanceOf[ByteBufferMessageSet]
+    def highWatermark: Long = underlying.hw
+    def exception: Option[Throwable] =
+      if (errorCode == ErrorMapping.NoError) None else Some(ErrorMapping.exceptionFor(errorCode))
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index da1cff0..b1cf668 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -88,7 +88,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
     debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
     val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
     val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
-    val networkClient = controllerContext.networkClientMap.getOrElseUpdate(broker.id, {
+    val networkClient = {
       val selector = new Selector(
         NetworkReceive.UNLIMITED,
         config.connectionsMaxIdleMs,
@@ -108,7 +108,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         Selectable.USE_DEFAULT_BUFFER_SIZE
       )
-    })
+    }
     val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, networkClient, brokerNode, config, time)
     requestThread.setDaemon(false)
     brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, broker, messageQueue, requestThread))
@@ -116,7 +116,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
 
   private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
     try {
-      brokerState.networkClient.close(brokerState.brokerNode.idString)
+      brokerState.networkClient.close()
       brokerState.messageQueue.clear()
       brokerState.requestSendThread.shutdown()
       brokerStateInfo.remove(brokerState.broker.id)

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 2d0845d..284fa23 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -18,7 +18,6 @@ package kafka.controller
 
 import java.util
 
-import org.apache.kafka.clients.NetworkClient
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse}
 
@@ -58,19 +57,6 @@ class ControllerContext(val zkClient: ZkClient,
   val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
   val partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
 
-  /**
-   * This map is used to ensure the following invariant: at most one `NetworkClient`/`Selector` instance should be
-   * created per broker during the lifetime of the `metrics` parameter received by `KafkaController` (which has the same
-   * lifetime as `KafkaController` since they are both shut down during `KafkaServer.shutdown()`).
-   *
-   * If we break this invariant, an exception is thrown during the instantiation of `Selector` due to the usage of
-   * two equal `MetricName` instances for two `Selector` instantiations. This way also helps to maintain the metrics sane.
-   *
-   * In the future, we should consider redesigning `ControllerChannelManager` so that we can use a single
-   * `NetworkClient`/`Selector` for multiple broker connections as that is the intended usage and it may help simplify this code.
-   */
-  private[controller] val networkClientMap = mutable.Map[Int, NetworkClient]()
-
   private var liveBrokersUnderlying: Set[Broker] = Set.empty
   private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
 
@@ -135,11 +121,6 @@ class ControllerContext(val zkClient: ZkClient,
     allTopics -= topic
   }
 
-  private[controller] def closeNetworkClients(): Unit = {
-    networkClientMap.values.foreach(_.close())
-    networkClientMap.clear()
-  }
-
 }
 
 
@@ -711,7 +692,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
       isRunning = false
     }
     onControllerResignation()
-    controllerContext.closeNetworkClients()
   }
 
   def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index dca975c..21c7e3e 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -17,19 +17,19 @@
 
 package kafka.server
 
+import java.util.concurrent.locks.ReentrantLock
+
 import kafka.cluster.BrokerEndPoint
-import kafka.utils.{Pool, ShutdownableThread}
-import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
-import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
-import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
-import kafka.utils.DelayedItem
-import kafka.utils.CoreUtils.inLock
-import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
+import kafka.consumer.PartitionTopicInfo
+import kafka.message.{InvalidMessageException, MessageAndOffset, ByteBufferMessageSet}
+import kafka.utils.{Pool, ShutdownableThread, DelayedItem}
+import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition}
 import kafka.metrics.KafkaMetricsGroup
-
+import kafka.utils.CoreUtils.inLock
+import org.apache.kafka.common.protocol.Errors
+import AbstractFetcherThread._
 import scala.collection.{mutable, Set, Map}
 import java.util.concurrent.TimeUnit
-import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.atomic.AtomicLong
 
 import com.yammer.metrics.core.Gauge
@@ -40,35 +40,25 @@ import com.yammer.metrics.core.Gauge
 abstract class AbstractFetcherThread(name: String,
                                      clientId: String,
                                      sourceBroker: BrokerEndPoint,
-                                     socketTimeout: Int,
-                                     socketBufferSize: Int,
-                                     fetchSize: Int,
-                                     fetcherBrokerId: Int = -1,
-                                     maxWait: Int = 0,
-                                     minBytes: Int = 1,
                                      fetchBackOffMs: Int = 0,
-                                     fetchRequestVersion: Short = FetchRequest.CurrentVersion,
                                      isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {
+
+  type REQ <: FetchRequest
+  type PD <: PartitionData
+
   private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
-  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
+
   private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
   val fetcherStats = new FetcherStats(metricId)
   val fetcherLagStats = new FetcherLagStats(metricId)
-  val fetchRequestBuilder = new FetchRequestBuilder().
-          clientId(clientId).
-          replicaId(fetcherBrokerId).
-          maxWait(maxWait).
-          minBytes(minBytes).
-          requestVersion(fetchRequestVersion)
 
   /* callbacks to be defined in subclass */
 
   // process fetched data
-  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long,
-                           partitionData: FetchResponsePartitionData)
+  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD)
 
   // handle a partition whose offset is out of range and return a new fetch offset
   def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long
@@ -76,45 +66,40 @@ abstract class AbstractFetcherThread(name: String,
   // deal with partitions with errors, potentially due to leadership changes
   def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
 
+  protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ
+
+  protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]
+
   override def shutdown(){
-    val justShutdown = initiateShutdown()
-    if (justShutdown && isInterruptible)
-      simpleConsumer.disconnectToHandleJavaIOBug()
+    initiateShutdown()
     inLock(partitionMapLock) {
       partitionMapCond.signalAll()
     }
     awaitShutdown()
-    simpleConsumer.close()
   }
 
   override def doWork() {
-    var fetchRequest: FetchRequest = null
-
-    inLock(partitionMapLock) {
-      partitionMap.foreach {
-        case((topicAndPartition, partitionFetchState)) =>
-          if(partitionFetchState.isActive)
-            fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
-              partitionFetchState.offset, fetchSize)
-      }
 
-      fetchRequest = fetchRequestBuilder.build()
-      if (fetchRequest.requestInfo.isEmpty) {
+    val fetchRequest = inLock(partitionMapLock) {
+      val fetchRequest = buildFetchRequest(partitionMap)
+      if (fetchRequest.isEmpty) {
         trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
         partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
       }
+      fetchRequest
     }
 
-    if(!fetchRequest.requestInfo.isEmpty)
+    if (!fetchRequest.isEmpty)
       processFetchRequest(fetchRequest)
   }
 
-  private def processFetchRequest(fetchRequest: FetchRequest) {
+  private def processFetchRequest(fetchRequest: REQ) {
     val partitionsWithError = new mutable.HashSet[TopicAndPartition]
-    var response: FetchResponse = null
+    var responseData: Map[TopicAndPartition, PD] = Map.empty
+
     try {
       trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
-      response = simpleConsumer.fetch(fetchRequest)
+      responseData = fetch(fetchRequest)
     } catch {
       case t: Throwable =>
         if (isRunning.get) {
@@ -128,64 +113,64 @@ abstract class AbstractFetcherThread(name: String,
     }
     fetcherStats.requestRate.mark()
 
-    if (response != null) {
+    if (responseData.nonEmpty) {
       // process fetched data
       inLock(partitionMapLock) {
-        response.data.foreach {
-          case(topicAndPartition, partitionData) =>
-            val (topic, partitionId) = topicAndPartition.asTuple
-            partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState =>
-              // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
-              if (fetchRequest.requestInfo(topicAndPartition).offset == currentPartitionFetchState.offset) {
-                partitionData.error match {
-                  case ErrorMapping.NoError =>
-                    try {
-                      val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
-                      val validBytes = messages.validBytes
-                      val newOffset = messages.shallowIterator.toSeq.lastOption match {
-                        case Some(m: MessageAndOffset) => m.nextOffset
-                        case None => currentPartitionFetchState.offset
-                      }
-                      partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
-                      fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
-                      fetcherStats.byteRate.mark(validBytes)
-                      // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                      processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
-                    } catch {
-                      case ime: InvalidMessageException =>
-                        // we log the error and continue. This ensures two things
-                        // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
-                        // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
-                        // should get fixed in the subsequent fetches
-                        logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset  + " error " + ime.getMessage)
-                      case e: Throwable =>
-                        throw new KafkaException("error processing data for partition [%s,%d] offset %d"
-                          .format(topic, partitionId, currentPartitionFetchState.offset), e)
-                    }
-                  case ErrorMapping.OffsetOutOfRangeCode =>
-                    try {
-                      val newOffset = handleOffsetOutOfRange(topicAndPartition)
-                      partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
-                      error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
-                        .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
-                    } catch {
-                      case e: Throwable =>
-                        error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
-                        partitionsWithError += topicAndPartition
+
+        responseData.foreach { case (topicAndPartition, partitionData) =>
+          val TopicAndPartition(topic, partitionId) = topicAndPartition
+          partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState =>
+            // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
+            if (fetchRequest.offset(topicAndPartition) == currentPartitionFetchState.offset) {
+              Errors.forCode(partitionData.errorCode) match {
+                case Errors.NONE =>
+                  try {
+                    val messages = partitionData.toByteBufferMessageSet
+                    val validBytes = messages.validBytes
+                    val newOffset = messages.shallowIterator.toSeq.lastOption match {
+                      case Some(m: MessageAndOffset) => m.nextOffset
+                      case None => currentPartitionFetchState.offset
                     }
-                  case _ =>
-                    if (isRunning.get) {
-                      error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
-                        ErrorMapping.exceptionFor(partitionData.error).getClass))
+                    partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
+                    fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.highWatermark - newOffset
+                    fetcherStats.byteRate.mark(validBytes)
+                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
+                    processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
+                  } catch {
+                    case ime: InvalidMessageException =>
+                      // we log the error and continue. This ensures two things
+                      // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
+                      // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
+                      // should get fixed in the subsequent fetches
+                      logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset  + " error " + ime.getMessage)
+                    case e: Throwable =>
+                      throw new KafkaException("error processing data for partition [%s,%d] offset %d"
+                        .format(topic, partitionId, currentPartitionFetchState.offset), e)
+                  }
+                case Errors.OFFSET_OUT_OF_RANGE =>
+                  try {
+                    val newOffset = handleOffsetOutOfRange(topicAndPartition)
+                    partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
+                    error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
+                      .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
+                  } catch {
+                    case e: Throwable =>
+                      error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
                       partitionsWithError += topicAndPartition
-                    }
-                }
+                  }
+                case _ =>
+                  if (isRunning.get) {
+                    error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
+                      partitionData.exception.get))
+                    partitionsWithError += topicAndPartition
+                  }
+              }
             })
         }
       }
     }
 
-    if(partitionsWithError.size > 0) {
+    if (partitionsWithError.nonEmpty) {
       debug("handling partitions with error for %s".format(partitionsWithError))
       handlePartitionsWithErrors(partitionsWithError)
     }
@@ -203,9 +188,7 @@ abstract class AbstractFetcherThread(name: String,
             else new PartitionFetchState(offset)
           )}
       partitionMapCond.signalAll()
-    } finally {
-      partitionMapLock.unlock()
-    }
+    } finally partitionMapLock.unlock()
   }
 
   def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) {
@@ -213,33 +196,42 @@ abstract class AbstractFetcherThread(name: String,
     try {
       for (partition <- partitions) {
         partitionMap.get(partition).foreach (currentPartitionFetchState =>
-          if(currentPartitionFetchState.isActive)
+          if (currentPartitionFetchState.isActive)
             partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay)))
         )
       }
       partitionMapCond.signalAll()
-    } finally {
-      partitionMapLock.unlock()
-    }
+    } finally partitionMapLock.unlock()
   }
 
   def removePartitions(topicAndPartitions: Set[TopicAndPartition]) {
     partitionMapLock.lockInterruptibly()
-    try {
-      topicAndPartitions.foreach(tp => partitionMap.remove(tp))
-    } finally {
-      partitionMapLock.unlock()
-    }
+    try topicAndPartitions.foreach(partitionMap.remove)
+    finally partitionMapLock.unlock()
   }
 
   def partitionCount() = {
     partitionMapLock.lockInterruptibly()
-    try {
-      partitionMap.size
-    } finally {
-      partitionMapLock.unlock()
-    }
+    try partitionMap.size
+    finally partitionMapLock.unlock()
   }
+
+}
+
+object AbstractFetcherThread {
+
+  trait FetchRequest {
+    def isEmpty: Boolean
+    def offset(topicAndPartition: TopicAndPartition): Long
+  }
+
+  trait PartitionData {
+    def errorCode: Short
+    def exception: Option[Throwable]
+    def toByteBufferMessageSet: ByteBufferMessageSet
+    def highWatermark: Long
+  }
+
 }
 
 class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup {

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 30406ce..f3f1fa6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -178,7 +178,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         socketServer.startup()
 
         /* start replica manager */
-        replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
+        replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkClient, kafkaScheduler, logManager,
+          isShuttingDown)
         replicaManager.startup()
 
         /* start kafka controller */

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index ef38ed3..6e845e9 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -18,13 +18,16 @@
 package kafka.server
 
 import kafka.cluster.BrokerEndPoint
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
 
-class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
+class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics: Metrics, time: Time)
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
                                        "Replica", brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
-    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)
+    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig,
+      replicaMgr, metrics, time)
   }
 
   def shutdown() {
@@ -32,4 +35,4 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val r
     closeAllFetchers()
     info("shutdown completed")
   }  
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 711d749..6c85e52 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -17,47 +17,96 @@
 
 package kafka.server
 
+import java.net.SocketTimeoutException
+
 import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{KAFKA_083, OffsetRequest, FetchResponsePartitionData}
+import kafka.api.KAFKA_083
 import kafka.common.{KafkaStorageException, TopicAndPartition}
+import ReplicaFetcherThread._
+import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse}
+import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkReceive, Selector}
+import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend, AbstractRequest, ListOffsetRequest}
+import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.{Errors, ApiKeys}
+import org.apache.kafka.common.security.ssl.SSLFactory
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{JavaConverters, Map, mutable}
+import JavaConverters._
 
-class ReplicaFetcherThread(name:String,
+class ReplicaFetcherThread(name: String,
                            sourceBroker: BrokerEndPoint,
                            brokerConfig: KafkaConfig,
-                           replicaMgr: ReplicaManager)
+                           replicaMgr: ReplicaManager,
+                           metrics: Metrics,
+                           time: Time)
   extends AbstractFetcherThread(name = name,
                                 clientId = name,
                                 sourceBroker = sourceBroker,
-                                socketTimeout = brokerConfig.replicaSocketTimeoutMs,
-                                socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes,
-                                fetchSize = brokerConfig.replicaFetchMaxBytes,
-                                fetcherBrokerId = brokerConfig.brokerId,
-                                maxWait = brokerConfig.replicaFetchWaitMaxMs,
-                                minBytes = brokerConfig.replicaFetchMinBytes,
                                 fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
-                                fetchRequestVersion =
-                                        if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0,
                                 isInterruptible = false) {
 
+  type REQ = FetchRequest
+  type PD = PartitionData
+
+  private val fetchRequestVersion: Short = if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0
+  private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
+  private val replicaId = brokerConfig.brokerId
+  private val maxWait = brokerConfig.replicaFetchWaitMaxMs
+  private val minBytes = brokerConfig.replicaFetchMinBytes
+  private val fetchSize = brokerConfig.replicaFetchMaxBytes
+
+  private def clientId = name
+
+  private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
+
+  private val networkClient = {
+    val selector = new Selector(
+      NetworkReceive.UNLIMITED,
+      brokerConfig.connectionsMaxIdleMs,
+      metrics,
+      time,
+      "replica-fetcher",
+      Map("broker-id" -> sourceBroker.id.toString).asJava,
+      false,
+      ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, brokerConfig.channelConfigs)
+    )
+    new NetworkClient(
+      selector,
+      new ManualMetadataUpdater(),
+      clientId,
+      1,
+      0,
+      Selectable.USE_DEFAULT_BUFFER_SIZE,
+      brokerConfig.replicaSocketReceiveBufferBytes
+    )
+  }
+
+  override def shutdown(): Unit = {
+    super.shutdown()
+    networkClient.close()
+  }
+
   // process fetched data
-  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
+  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) {
     try {
-      val topic = topicAndPartition.topic
-      val partitionId = topicAndPartition.partition
+      val TopicAndPartition(topic, partitionId) = topicAndPartition
       val replica = replicaMgr.getReplica(topic, partitionId).get
-      val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+      val messageSet = partitionData.toByteBufferMessageSet
 
       if (fetchOffset != replica.logEndOffset.messageOffset)
         throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
       trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
-            .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw))
+            .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.highWatermark))
       replica.log.get.append(messageSet, assignOffsets = false)
       trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
             .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicAndPartition))
-      val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.hw)
+      val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
       // for the follower replica, we do not need to keep
       // its segment base offset the physical position,
       // these values will be computed upon making the leader
@@ -87,7 +136,9 @@ class ReplicaFetcherThread(name:String,
      *
      * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
      */
-    val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
+    val leaderEndOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.LATEST_TIMESTAMP,
+      brokerConfig.brokerId)
+
     if (leaderEndOffset < replica.logEndOffset.messageOffset) {
       // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
       // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
@@ -112,7 +163,8 @@ class ReplicaFetcherThread(name:String,
        *
        * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
        */
-      val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
+      val leaderStartOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP,
+        brokerConfig.brokerId)
       warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
         .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
       replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
@@ -124,4 +176,85 @@ class ReplicaFetcherThread(name:String,
   def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
     delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
   }
+
+  protected def fetch(fetchRequest: FetchRequest): Map[TopicAndPartition, PartitionData] = {
+    val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying)
+    new FetchResponse(clientResponse.responseBody).responseData.asScala.map { case (key, value) =>
+      TopicAndPartition(key.topic, key.partition) -> new PartitionData(value)
+    }
+  }
+
+  private def sendRequest(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest): ClientResponse = {
+    import kafka.utils.NetworkClientBlockingOps._
+    val header = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
+    try {
+      if (!networkClient.blockingReady(sourceNode, socketTimeout)(time))
+        throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
+      else {
+        val send = new RequestSend(sourceBroker.id.toString, header, request.toStruct)
+        val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
+        networkClient.blockingSendAndReceive(clientRequest, socketTimeout)(time).getOrElse {
+          throw new SocketTimeoutException(s"No response received within $socketTimeout ms")
+        }
+      }
+    }
+    catch {
+      case e: Throwable =>
+        networkClient.close(sourceBroker.id.toString)
+        throw e
+    }
+
+  }
+
+  private def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = {
+    val topicPartition = new TopicPartition(topicAndPartition.topic, topicAndPartition.partition)
+    val partitions = Map(
+      topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1)
+    )
+    val request = new ListOffsetRequest(consumerId, partitions.asJava)
+    val clientResponse = sendRequest(ApiKeys.LIST_OFFSETS, None, request)
+    val response = new ListOffsetResponse(clientResponse.responseBody)
+    val partitionData = response.responseData.get(topicPartition)
+    Errors.forCode(partitionData.errorCode) match {
+      case Errors.NONE => partitionData.offsets.asScala.head
+      case errorCode => throw errorCode.exception
+    }
+  }
+
+  protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): FetchRequest = {
+    val requestMap = mutable.Map.empty[TopicPartition, JFetchRequest.PartitionData]
+
+    partitionMap.foreach { case ((TopicAndPartition(topic, partition), partitionFetchState)) =>
+      if (partitionFetchState.isActive)
+        requestMap(new TopicPartition(topic, partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)
+    }
+
+    new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, requestMap.asJava))
+  }
+
+}
+
+object ReplicaFetcherThread {
+
+  private[server] class FetchRequest(val underlying: JFetchRequest) extends AbstractFetcherThread.FetchRequest {
+    def isEmpty: Boolean = underlying.fetchData.isEmpty
+    def offset(topicAndPartition: TopicAndPartition): Long =
+      underlying.fetchData.asScala(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition)).offset
+  }
+
+  private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
+
+    def errorCode: Short = underlying.errorCode
+
+    def toByteBufferMessageSet: ByteBufferMessageSet = new ByteBufferMessageSet(underlying.recordSet)
+
+    def highWatermark: Long = underlying.highWatermark
+
+    def exception: Option[Throwable] = Errors.forCode(errorCode) match {
+      case Errors.NONE => None
+      case e => Some(e.exception)
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c195536..3e287ea 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -30,7 +30,9 @@ import kafka.message.{ByteBufferMessageSet, MessageSet}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.{Time => JTime}
 
 import scala.collection._
 
@@ -94,7 +96,9 @@ object ReplicaManager {
 }
 
 class ReplicaManager(val config: KafkaConfig,
-                     private val time: Time,
+                     metrics: Metrics,
+                     time: Time,
+                     jTime: JTime,
                      val zkClient: ZkClient,
                      scheduler: Scheduler,
                      val logManager: LogManager,
@@ -104,7 +108,7 @@ class ReplicaManager(val config: KafkaConfig,
   private val localBrokerId = config.brokerId
   private val allPartitions = new Pool[(String, Int), Partition]
   private val replicaStateChangeLock = new Object
-  val replicaFetcherManager = new ReplicaFetcherManager(config, this)
+  val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
   private var hwThreadInitialized = false


Mime
View raw message