kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8443; Broker support for fetch from followers (#6832)
Date Thu, 04 Jul 2019 15:19:11 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 23beeea  KAFKA-8443; Broker support for fetch from followers (#6832)
23beeea is described below

commit 23beeea34bf9e5e7a33dc0566d54a17b2b417c0f
Author: David Arthur <mumrah@gmail.com>
AuthorDate: Thu Jul 4 11:18:51 2019 -0400

    KAFKA-8443; Broker support for fetch from followers (#6832)
    
    Follow on to #6731, this PR adds broker-side support for [KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica) (fetch from followers).
    
    Changes:
    * All brokers will handle FetchRequest regardless of leadership
    * Leaders can compute a preferred replica to return to the client
    * New ReplicaSelector interface for determining the preferred replica
    * Incremental fetches will include partitions with no records if the preferred replica has been computed
    * Adds new JMX to expose the current preferred read replica of a partition in the consumer
    
    Two new conditions were added for completing a delayed fetch. They both relate to communicating the high watermark to followers without waiting for a timeout:
    * For regular fetches, if the high watermark changes within a single fetch request
    * For incremental fetch sessions, if the follower's high watermark is lower than the leader
    
    A new JMX attribute `preferred-read-replica` was added to the `kafka.consumer:type=consumer-fetch-manager-metrics,client-id=some-consumer,topic=my-topic,partition=0` object. This was added to support the new system test which verifies that the fetch from follower behavior works end-to-end. This attribute could also be useful in the future when debugging problems with the consumer.
    
    Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  34 +++-
 .../consumer/internals/FetcherMetricsRegistry.java |   8 +-
 .../kafka/common/replica/ClientMetadata.java       | 124 ++++++++++++
 .../apache/kafka/common/replica/PartitionView.java |  72 +++++++
 .../common/replica/RackAwareReplicaSelector.java   |  54 ++++++
 .../kafka/common/replica/ReplicaSelector.java      |  49 +++++
 .../apache/kafka/common/replica/ReplicaView.java   | 105 ++++++++++
 .../apache/kafka/common/requests/FetchRequest.java |   1 +
 .../kafka/common/requests/FetchResponse.java       |   4 +-
 .../kafka/common/replica/ReplicaSelectorTest.java  |  88 +++++++++
 core/src/main/scala/kafka/cluster/Partition.scala  |   6 +-
 core/src/main/scala/kafka/cluster/Replica.scala    |  20 ++
 core/src/main/scala/kafka/log/Log.scala            |  68 +++++--
 .../src/main/scala/kafka/server/DelayedFetch.scala |  15 +-
 .../src/main/scala/kafka/server/FetchSession.scala |   4 +
 core/src/main/scala/kafka/server/KafkaApis.scala   |  28 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   4 +
 .../main/scala/kafka/server/MetadataCache.scala    |  19 ++
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  18 +-
 .../main/scala/kafka/server/ReplicaManager.scala   | 213 ++++++++++++++++----
 .../kafka/server/DelayedFetchTest.scala            | 115 ++++++++++-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  17 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  31 +++
 .../scala/unit/kafka/server/FetchRequestTest.scala |   6 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   6 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   1 +
 .../server/ReplicaAlterLogDirsThreadTest.scala     |   2 +
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  18 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 214 ++++++++++++++++++---
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |   6 +-
 tests/kafkatest/services/console_consumer.py       |   3 +-
 tests/kafkatest/services/kafka/kafka.py            |   9 +-
 tests/kafkatest/services/monitor/jmx.py            |   7 +-
 tests/kafkatest/tests/client/truncation_test.py    |   1 -
 34 files changed, 1236 insertions(+), 134 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index cff6e30..08ab9fb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -42,6 +43,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -1706,8 +1708,20 @@ public class Fetcher<K, V> implements Closeable {
                     if (!newAssignedPartitions.contains(tp)) {
                         metrics.removeSensor(partitionLagMetricName(tp));
                         metrics.removeSensor(partitionLeadMetricName(tp));
+                        metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
                     }
                 }
+
+                for (TopicPartition tp : newAssignedPartitions) {
+                    if (!this.assignedPartitions.contains(tp)) {
+                        MetricName metricName = partitionPreferredReadReplicaMetricName(tp);
+                        if (metrics.metric(metricName) == null) {
+                            metrics.addMetric(metricName, (Gauge<Integer>) (config, now) ->
+                                subscription.preferredReadReplica(tp, 0L).orElse(-1));
+                        }
+                    }
+                }
+
                 this.assignedPartitions = newAssignedPartitions;
                 this.assignmentId = newAssignmentId;
             }
@@ -1719,9 +1733,7 @@ public class Fetcher<K, V> implements Closeable {
             String name = partitionLeadMetricName(tp);
             Sensor recordsLead = this.metrics.getSensor(name);
             if (recordsLead == null) {
-                Map<String, String> metricTags = new HashMap<>(2);
-                metricTags.put("topic", tp.topic().replace('.', '_'));
-                metricTags.put("partition", String.valueOf(tp.partition()));
+                Map<String, String> metricTags = topicPartitionTags(tp);
 
                 recordsLead = this.metrics.sensor(name);
 
@@ -1738,10 +1750,7 @@ public class Fetcher<K, V> implements Closeable {
             String name = partitionLagMetricName(tp);
             Sensor recordsLag = this.metrics.getSensor(name);
             if (recordsLag == null) {
-                Map<String, String> metricTags = new HashMap<>(2);
-                metricTags.put("topic", tp.topic().replace('.', '_'));
-                metricTags.put("partition", String.valueOf(tp.partition()));
-
+                Map<String, String> metricTags = topicPartitionTags(tp);
                 recordsLag = this.metrics.sensor(name);
 
                 recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value());
@@ -1759,6 +1768,17 @@ public class Fetcher<K, V> implements Closeable {
             return tp + ".records-lead";
         }
 
+        private MetricName partitionPreferredReadReplicaMetricName(TopicPartition tp) {
+            Map<String, String> metricTags = topicPartitionTags(tp);
+            return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags);
+        }
+
+        private Map<String, String> topicPartitionTags(TopicPartition tp) {
+            Map<String, String> metricTags = new HashMap<>(2);
+            metricTags.put("topic", tp.topic().replace('.', '_'));
+            metricTags.put("partition", String.valueOf(tp.partition()));
+            return metricTags;
+        }
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
index f869615..501ffe9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
@@ -54,6 +54,7 @@ public class FetcherMetricsRegistry {
     public MetricNameTemplate partitionRecordsLead;
     public MetricNameTemplate partitionRecordsLeadMin;
     public MetricNameTemplate partitionRecordsLeadAvg;
+    public MetricNameTemplate partitionPreferredReadReplica;
 
     public FetcherMetricsRegistry() {
         this(new HashSet<String>(), "");
@@ -139,7 +140,9 @@ public class FetcherMetricsRegistry {
                 "The min lead of the partition", partitionTags);
         this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
                 "The average lead of the partition", partitionTags);
-
+        this.partitionPreferredReadReplica = new MetricNameTemplate(
+                "preferred-read-replica", "consumer-fetch-manager-metrics",
+                "The current read replica for the partition, or -1 if reading from leader", partitionTags);
     }
 
     public List<MetricNameTemplate> getAllTemplates() {
@@ -171,7 +174,8 @@ public class FetcherMetricsRegistry {
             partitionRecordsLagMax,
             partitionRecordsLead,
             partitionRecordsLeadMin,
-            partitionRecordsLeadAvg
+            partitionRecordsLeadAvg,
+            partitionPreferredReadReplica
         );
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/replica/ClientMetadata.java b/clients/src/main/java/org/apache/kafka/common/replica/ClientMetadata.java
new file mode 100644
index 0000000..b328733
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/replica/ClientMetadata.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.replica;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.net.InetAddress;
+import java.util.Objects;
+
+/**
+ * Holder for all the client metadata required to determine a preferred replica.
+ */
+public interface ClientMetadata {
+
+    /**
+     * Rack ID sent by the client
+     */
+    String rackId();
+
+    /**
+     * Client ID sent by the client
+     */
+    String clientId();
+
+    /**
+     * Incoming address of the client
+     */
+    InetAddress clientAddress();
+
+    /**
+     * Security principal of the client
+     */
+    KafkaPrincipal principal();
+
+    /**
+     * Listener name for the client
+     */
+    String listenerName();
+
+
+    class DefaultClientMetadata implements ClientMetadata {
+        private final String rackId;
+        private final String clientId;
+        private final InetAddress clientAddress;
+        private final KafkaPrincipal principal;
+        private final String listenerName;
+
+        public DefaultClientMetadata(String rackId, String clientId, InetAddress clientAddress,
+                                     KafkaPrincipal principal, String listenerName) {
+            this.rackId = rackId;
+            this.clientId = clientId;
+            this.clientAddress = clientAddress;
+            this.principal = principal;
+            this.listenerName = listenerName;
+        }
+
+        @Override
+        public String rackId() {
+            return rackId;
+        }
+
+        @Override
+        public String clientId() {
+            return clientId;
+        }
+
+        @Override
+        public InetAddress clientAddress() {
+            return clientAddress;
+        }
+
+        @Override
+        public KafkaPrincipal principal() {
+            return principal;
+        }
+
+        @Override
+        public String listenerName() {
+            return listenerName;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            DefaultClientMetadata that = (DefaultClientMetadata) o;
+            return Objects.equals(rackId, that.rackId) &&
+                    Objects.equals(clientId, that.clientId) &&
+                    Objects.equals(clientAddress, that.clientAddress) &&
+                    Objects.equals(principal, that.principal) &&
+                    Objects.equals(listenerName, that.listenerName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(rackId, clientId, clientAddress, principal, listenerName);
+        }
+
+        @Override
+        public String toString() {
+            return "DefaultClientMetadata{" +
+                    "rackId='" + rackId + '\'' +
+                    ", clientId='" + clientId + '\'' +
+                    ", clientAddress=" + clientAddress +
+                    ", principal=" + principal +
+                    ", listenerName='" + listenerName + '\'' +
+                    '}';
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/replica/PartitionView.java b/clients/src/main/java/org/apache/kafka/common/replica/PartitionView.java
new file mode 100644
index 0000000..8174e63
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/replica/PartitionView.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.replica;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * View of a partition used by {@link ReplicaSelector} to determine a preferred replica.
+ */
+public interface PartitionView {
+    Set<ReplicaView> replicas();
+
+    ReplicaView leader();
+
+    class DefaultPartitionView implements PartitionView {
+        private final Set<ReplicaView> replicas;
+        private final ReplicaView leader;
+
+        public DefaultPartitionView(Set<ReplicaView> replicas, ReplicaView leader) {
+            this.replicas = Collections.unmodifiableSet(replicas);
+            this.leader = leader;
+        }
+
+        @Override
+        public Set<ReplicaView> replicas() {
+            return replicas;
+        }
+
+        @Override
+        public ReplicaView leader() {
+            return leader;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            DefaultPartitionView that = (DefaultPartitionView) o;
+            return Objects.equals(replicas, that.replicas) &&
+                    Objects.equals(leader, that.leader);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(replicas, leader);
+        }
+
+        @Override
+        public String toString() {
+            return "DefaultPartitionView{" +
+                    "replicas=" + replicas +
+                    ", leader=" + leader +
+                    '}';
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/replica/RackAwareReplicaSelector.java b/clients/src/main/java/org/apache/kafka/common/replica/RackAwareReplicaSelector.java
new file mode 100644
index 0000000..8ae6872
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/replica/RackAwareReplicaSelector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.replica;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Returns a replica whose rack id is equal to the rack id specified in the client request metadata. If no such replica
+ * is found, returns the leader.
+ */
+public class RackAwareReplicaSelector implements ReplicaSelector {
+
+    @Override
+    public Optional<ReplicaView> select(TopicPartition topicPartition,
+                                        ClientMetadata clientMetadata,
+                                        PartitionView partitionView) {
+        if (clientMetadata.rackId() != null && !clientMetadata.rackId().isEmpty()) {
+            Set<ReplicaView> sameRackReplicas = partitionView.replicas().stream()
+                    .filter(replicaInfo -> clientMetadata.rackId().equals(replicaInfo.endpoint().rack()))
+                    .collect(Collectors.toSet());
+            if (sameRackReplicas.isEmpty()) {
+                return Optional.of(partitionView.leader());
+            } else {
+                if (sameRackReplicas.contains(partitionView.leader())) {
+                    // Use the leader if it's in this rack
+                    return Optional.of(partitionView.leader());
+                } else {
+                    // Otherwise, get the most caught-up replica
+                    return sameRackReplicas.stream().max(ReplicaView.comparator());
+                }
+            }
+        } else {
+            return Optional.of(partitionView.leader());
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/replica/ReplicaSelector.java b/clients/src/main/java/org/apache/kafka/common/replica/ReplicaSelector.java
new file mode 100644
index 0000000..301fc9f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/replica/ReplicaSelector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.replica;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Plug-able interface for selecting a preferred read replica given the current set of replicas for a partition
+ * and metadata from the client.
+ */
+public interface ReplicaSelector extends Configurable, Closeable {
+
+    /**
+     * Select the preferred replica a client should use for fetching. If no replica is available, this will return an
+     * empty optional.
+     */
+    Optional<ReplicaView> select(TopicPartition topicPartition,
+                                 ClientMetadata clientMetadata,
+                                 PartitionView partitionView);
+    @Override
+    default void close() throws IOException {
+        // No-op by default
+    }
+
+    @Override
+    default void configure(Map<String, ?> configs) {
+        // No-op by default
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/replica/ReplicaView.java b/clients/src/main/java/org/apache/kafka/common/replica/ReplicaView.java
new file mode 100644
index 0000000..69c6cf7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/replica/ReplicaView.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.replica;
+
+import org.apache.kafka.common.Node;
+
+import java.util.Comparator;
+import java.util.Objects;
+
+/**
+ * View of a replica used by {@link ReplicaSelector} to determine a preferred replica.
+ */
+public interface ReplicaView {
+
+    /**
+     * The endpoint information for this replica (hostname, port, rack, etc)
+     */
+    Node endpoint();
+
+    /**
+     * The log end offset for this replica
+     */
+    long logEndOffset();
+
+    /**
+     * The number of milliseconds (if any) since the last time this replica was caught up to the high watermark.
+     * For a leader replica, this is always zero.
+     */
+    long timeSinceLastCaughtUpMs();
+
+    /**
+     * Comparator for ReplicaView that returns in the order of "most caught up". This is used for deterministic
+     * selection of a replica when there is a tie from a selector.
+     */
+    static Comparator<ReplicaView> comparator() {
+        return Comparator.comparingLong(ReplicaView::logEndOffset)
+            .thenComparing(Comparator.comparingLong(ReplicaView::timeSinceLastCaughtUpMs).reversed())
+            .thenComparing(replicaInfo -> replicaInfo.endpoint().id());
+    }
+
+    class DefaultReplicaView implements ReplicaView {
+        private final Node endpoint;
+        private final long logEndOffset;
+        private final long timeSinceLastCaughtUpMs;
+
+        public DefaultReplicaView(Node endpoint, long logEndOffset, long timeSinceLastCaughtUpMs) {
+            this.endpoint = endpoint;
+            this.logEndOffset = logEndOffset;
+            this.timeSinceLastCaughtUpMs = timeSinceLastCaughtUpMs;
+        }
+
+        @Override
+        public Node endpoint() {
+            return endpoint;
+        }
+
+        @Override
+        public long logEndOffset() {
+            return logEndOffset;
+        }
+
+        @Override
+        public long timeSinceLastCaughtUpMs() {
+            return timeSinceLastCaughtUpMs;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            DefaultReplicaView that = (DefaultReplicaView) o;
+            return logEndOffset == that.logEndOffset &&
+                    Objects.equals(endpoint, that.endpoint) &&
+                    Objects.equals(timeSinceLastCaughtUpMs, that.timeSinceLastCaughtUpMs);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(endpoint, logEndOffset, timeSinceLastCaughtUpMs);
+        }
+
+        @Override
+        public String toString() {
+            return "DefaultReplicaView{" +
+                    "endpoint=" + endpoint +
+                    ", logEndOffset=" + logEndOffset +
+                    ", timeSinceLastCaughtUpMs=" + timeSinceLastCaughtUpMs +
+                    '}';
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index da09df3..2c0455a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -195,6 +195,7 @@ public class FetchRequest extends AbstractRequest {
     // V10 bumped up to indicate ZStandard capability. (see KIP-110)
     private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9;
 
+    // V11 added rack ID to support read from followers (KIP-392)
     private static final Schema FETCH_REQUEST_V11 = new Schema(
             REPLICA_ID,
             MAX_WAIT_TIME,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 942b0d6..d387e81 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -144,6 +144,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
             LOG_START_OFFSET,
             new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
 
+    // Introduced in V11 to support read from followers (KIP-392)
     private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V6 = new Schema(
             PARTITION_ID,
             ERROR_CODE,
@@ -207,6 +208,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
     // V10 bumped up to indicate ZStandard capability. (see KIP-110)
     private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9;
 
+    // V11 added preferred read replica for each partition response to support read from followers (KIP-392)
     private static final Schema FETCH_RESPONSE_V11 = new Schema(
             THROTTLE_TIME_MS,
             ERROR_CODE,
@@ -329,7 +331,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
             result = 31 * result + Long.hashCode(highWatermark);
             result = 31 * result + Long.hashCode(lastStableOffset);
             result = 31 * result + Long.hashCode(logStartOffset);
-            result = 31 * result + (preferredReadReplica != null ? preferredReadReplica.hashCode() : 0);
+            result = 31 * result + Objects.hashCode(preferredReadReplica);
             result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
             result = 31 * result + (records != null ? records.hashCode() : 0);
             return result;
diff --git a/clients/src/test/java/org/apache/kafka/common/replica/ReplicaSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/replica/ReplicaSelectorTest.java
new file mode 100644
index 0000000..da03e57
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/replica/ReplicaSelectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.replica;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.test.TestUtils.assertOptional;
+import static org.junit.Assert.assertEquals;
+
+public class ReplicaSelectorTest {
+
+    @Test
+    public void testSameRackSelector() {
+        TopicPartition tp = new TopicPartition("test", 0);
+
+        List<ReplicaView> replicaViewSet = replicaInfoSet();
+        ReplicaView leader = replicaViewSet.get(0);
+        PartitionView partitionView = partitionInfo(new HashSet<>(replicaViewSet), leader);
+
+        ReplicaSelector selector = new RackAwareReplicaSelector();
+        Optional<ReplicaView> selected = selector.select(tp, metadata("rack-b"), partitionView);
+        assertOptional(selected, replicaInfo -> {
+            assertEquals("Expect replica to be in rack-b", replicaInfo.endpoint().rack(), "rack-b");
+            assertEquals("Expected replica 3 since it is more caught-up", replicaInfo.endpoint().id(), 3);
+        });
+
+        selected = selector.select(tp, metadata("not-a-rack"), partitionView);
+        assertOptional(selected, replicaInfo -> {
+            assertEquals("Expect leader when we can't find any nodes in given rack", replicaInfo, leader);
+        });
+
+        selected = selector.select(tp, metadata("rack-a"), partitionView);
+        assertOptional(selected, replicaInfo -> {
+            assertEquals("Expect replica to be in rack-a", replicaInfo.endpoint().rack(), "rack-a");
+            assertEquals("Expect the leader since it's in rack-a", replicaInfo, leader);
+        });
+
+
+    }
+
+    static List<ReplicaView> replicaInfoSet() {
+        return Stream.of(
+                replicaInfo(new Node(0, "host0", 1234, "rack-a"), 4, 0),
+                replicaInfo(new Node(1, "host1", 1234, "rack-a"), 2, 5),
+                replicaInfo(new Node(2, "host2", 1234, "rack-b"), 3, 3),
+                replicaInfo(new Node(3, "host3", 1234, "rack-b"), 4, 2)
+
+        ).collect(Collectors.toList());
+    }
+
+    static ReplicaView replicaInfo(Node node, long logOffset, long timeSinceLastCaughtUpMs) {
+        return new ReplicaView.DefaultReplicaView(node, logOffset, timeSinceLastCaughtUpMs);
+    }
+
+    static PartitionView partitionInfo(Set<ReplicaView> replicaViewSet, ReplicaView leader) {
+        return new PartitionView.DefaultPartitionView(replicaViewSet, leader);
+    }
+
+    static ClientMetadata metadata(String rack) {
+        return new ClientMetadata.DefaultClientMetadata(rack, "test-client",
+                InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, "TEST");
+    }
+}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 9ffce13..1965d7f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -519,16 +519,18 @@ class Partition(val topicPartition: TopicPartition,
 
       if (isNewLeader) {
         // construct the high watermark metadata for the new leader replica
-        leaderLog.maybeFetchHighWatermarkOffsetMetadata()
+        leaderLog.initializeHighWatermarkOffsetMetadata()
         // mark local replica as the leader after converting hw
         leaderReplicaIdOpt = Some(localBrokerId)
         // reset log end offset for remote replicas
-        remoteReplicas.foreach { _.updateFetchState(
+        remoteReplicas.foreach { replica =>
+          replica.updateFetchState(
             followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
             followerStartOffset = Log.UnknownOffset,
             followerFetchTimeMs = 0L,
             leaderEndOffset = Log.UnknownOffset
           )
+          replica.updateLastSentHighWatermark(0L)
         }
       }
       // we may need to increment high watermark since ISR could be down to 1
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 1c61fad..5504db5 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -42,6 +42,10 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
   // the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition.
   @volatile private[this] var _lastCaughtUpTimeMs = 0L
 
+  // highWatermark is the leader's high watermark after the most recent FetchRequest from this follower. This is
+  // used to determine the maximum HW this follower knows about. See KIP-392
+  @volatile private[this] var _lastSentHighWatermark = 0L
+
   def logStartOffset: Long = _logStartOffset
 
   def logEndOffsetMetadata: LogOffsetMetadata = _logEndOffsetMetadata
@@ -50,6 +54,8 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
 
   def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
 
+  def lastSentHighWatermark: Long = _lastSentHighWatermark
+
   /*
    * If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
    * set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
@@ -78,6 +84,19 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
     trace(s"Updated state of replica to $this")
   }
 
+  /**
+    * Update the high watermark of this remote replica. This is used to track what we think is the last known HW to
+    * a remote follower. Since this is recorded when we send a response, there is no way to guarantee that the follower
+    * actually receives this HW. So we consider this to be an upper bound on what the follower knows.
+    *
+    * When handling fetches, the last sent high watermark for a replica is checked to see if we should return immediately
+    * in order to propagate the HW more expeditiously. See KIP-392
+    */
+  def updateLastSentHighWatermark(highWatermark: Long): Unit = {
+    _lastSentHighWatermark = highWatermark
+    trace(s"Updated HW of replica to $highWatermark")
+  }
+
   def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = {
     lastFetchLeaderLogEndOffset = curLeaderLogEndOffset
     lastFetchTimeMs = curTimeMs
@@ -96,6 +115,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
     replicaString.append(s", logEndOffsetMetadata=$logEndOffsetMetadata")
     replicaString.append(s", lastFetchLeaderLogEndOffset=$lastFetchLeaderLogEndOffset")
     replicaString.append(s", lastFetchTimeMs=$lastFetchTimeMs")
+    replicaString.append(s", lastSentHighWatermark=$lastSentHighWatermark")
     replicaString.append(")")
     replicaString.toString
   }
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b76185e..31412bd 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -328,12 +328,14 @@ class Log(@volatile var dir: File,
    * Convert hw to local offset metadata by reading the log at the hw offset.
    * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata.
    */
-  def maybeFetchHighWatermarkOffsetMetadata(): Unit = {
+  def initializeHighWatermarkOffsetMetadata(): Unit = {
     if (highWatermarkMetadata.messageOffsetOnly) {
-      highWatermarkMetadata = convertToOffsetMetadata(highWatermark).getOrElse {
-        convertToOffsetMetadata(logStartOffset).getOrElse {
-          val firstSegmentOffset = logSegments.head.baseOffset
-          LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0)
+      lock.synchronized {
+        highWatermarkMetadata = convertToOffsetMetadata(highWatermark).getOrElse {
+          convertToOffsetMetadata(logStartOffset).getOrElse {
+            val firstSegmentOffset = logSegments.head.baseOffset
+            LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0)
+          }
         }
       }
     }
@@ -357,12 +359,40 @@ class Log(@volatile var dir: File,
 
   def lastStableOffsetLag: Long = highWatermark - lastStableOffset
 
+  /**
+    * Fully materialize and return an offset snapshot including segment position info. This method will update
+    * the LogOffsetMetadata for the high watermark and last stable offset if they are message-only. Throws an
+    * offset out of range error if the segment info cannot be loaded.
+    */
   def offsetSnapshot: LogOffsetSnapshot = {
+    var highWatermark = _highWatermarkMetadata
+    if (highWatermark.messageOffsetOnly) {
+      lock.synchronized {
+        val fullOffset = convertToOffsetMetadataOrThrow(_highWatermarkMetadata.messageOffset)
+        _highWatermarkMetadata = fullOffset
+        highWatermark = _highWatermarkMetadata
+      }
+    }
+
+    var lastStable: LogOffsetMetadata = lastStableOffsetMetadata
+    if (lastStable.messageOffsetOnly) {
+      lock synchronized {
+        firstUnstableOffset match {
+          case None => highWatermark
+          case Some(offsetMetadata) =>
+            val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
+            firstUnstableOffset = Some(fullOffset)
+            lastStable = fullOffset
+        }
+      }
+    }
+
     LogOffsetSnapshot(
-      logStartOffset = logStartOffset,
-      logEndOffset = logEndOffsetMetadata,
-      highWatermark =  highWatermarkMetadata,
-      lastStableOffset = lastStableOffsetMetadata)
+      logStartOffset,
+      logEndOffsetMetadata,
+      highWatermark,
+      lastStable
+    )
   }
 
   private val tags = {
@@ -1534,18 +1564,26 @@ class Log(@volatile var dir: File,
    */
   def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {
     try {
-      val fetchDataInfo = read(offset,
-        maxLength = 1,
-        maxOffset = None,
-        minOneMessage = false,
-        includeAbortedTxns = false)
-      Some(fetchDataInfo.fetchOffsetMetadata)
+      Some(convertToOffsetMetadataOrThrow(offset))
     } catch {
       case _: OffsetOutOfRangeException => None
     }
   }
 
   /**
+    * Given a message offset, find its corresponding offset metadata in the log.
+    * If the message offset is out of range, throw an OffsetOutOfRangeException
+    */
+  def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
+    val fetchDataInfo = read(offset,
+      maxLength = 1,
+      maxOffset = None,
+      minOneMessage = false,
+      includeAbortedTxns = false)
+    fetchDataInfo.fetchOffsetMetadata
+  }
+
+  /**
    * Delete any log segments matching the given predicate function,
    * starting with the oldest segment and moving forward until a segment doesn't match.
    *
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 9020099..c3dbde9 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 
 import scala.collection._
@@ -59,6 +60,7 @@ class DelayedFetch(delayMs: Long,
                    fetchMetadata: FetchMetadata,
                    replicaManager: ReplicaManager,
                    quota: ReplicaQuota,
+                   clientMetadata: Option[ClientMetadata],
                    responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit)
   extends DelayedOperation(delayMs) {
 
@@ -71,7 +73,7 @@ class DelayedFetch(delayMs: Long,
    * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
    * Case E: The partition is in an offline log directory on this broker
    * Case F: This broker is the leader, but the requested epoch is now fenced
-   *
+   * Case G: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
    * Upon completion, should return whatever data is available for each valid partition
    */
   override def tryComplete(): Boolean = {
@@ -114,6 +116,14 @@ class DelayedFetch(delayMs: Long,
                   accumulatedSize += bytesAvailable
               }
             }
+
+            if (fetchMetadata.isFromFollower) {
+              // Case G check if the follower has the latest HW from the leader
+              if (partition.getReplica(fetchMetadata.replicaId)
+                .exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
+                return forceComplete()
+              }
+            }
           }
         } catch {
           case _: KafkaStorageException => // Case E
@@ -157,11 +167,12 @@ class DelayedFetch(delayMs: Long,
       fetchMaxBytes = fetchMetadata.fetchMaxBytes,
       hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
       readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
+      clientMetadata = clientMetadata,
       quota = quota)
 
     val fetchPartitionData = logReadResults.map { case (tp, result) =>
       tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
-        result.lastStableOffset, result.info.abortedTransactions)
+        result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica)
     }
 
     responseCallback(fetchPartitionData)
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index fe97346..aa482ec 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -144,6 +144,10 @@ class CachedPartition(val topic: String,
       if (updateResponseData)
         localLogStartOffset = respData.logStartOffset
     }
+    if (respData.preferredReadReplica.isPresent) {
+      // If the broker computed a preferred read replica, we need to include it in the response
+      mustRespond = true
+    }
     if (respData.error.code != 0) {
       // Partitions with errors are always included in the response.
       // We also set the cached highWatermark to an invalid offset, -1.
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7ce7d99..698e5a9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -69,6 +69,8 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.replica.ClientMetadata
+import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
@@ -82,11 +84,13 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 
+import scala.compat.java8.OptionConverters._
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 
+
 /**
  * Logic to handle the various Kafka requests
  */
@@ -577,6 +581,18 @@ class KafkaApis(val requestChannel: RequestChannel,
       fetchRequest.toForget,
       fetchRequest.isFromFollower)
 
+    val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) {
+      // Fetch API version 11 added preferred replica logic
+      Some(new DefaultClientMetadata(
+        fetchRequest.rackId,
+        clientId,
+        request.context.clientAddress,
+        request.context.principal,
+        request.context.listenerName.value))
+    } else {
+      None
+    }
+
     def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = {
       new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
         FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
@@ -650,7 +666,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
                 // client.
                 new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
-                  partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions,
+                  partitionData.lastStableOffset, partitionData.logStartOffset,
+                  partitionData.preferredReadReplica, partitionData.abortedTransactions,
                   new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
               } catch {
                 case e: UnsupportedCompressionTypeException =>
@@ -659,7 +676,8 @@ class KafkaApis(val requestChannel: RequestChannel,
               }
             }
           case None => new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
-            partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions,
+            partitionData.lastStableOffset, partitionData.logStartOffset,
+            partitionData.preferredReadReplica, partitionData.abortedTransactions,
             unconvertedRecords)
         }
       }
@@ -672,7 +690,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
         val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
         partitions.put(tp, new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
-          data.logStartOffset, abortedTransactions, data.records))
+          data.logStartOffset, data.preferredReadReplica.map(int2Integer).asJava,
+          abortedTransactions, data.records))
       }
       erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
 
@@ -769,7 +788,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         interesting,
         replicationQuota(fetchRequest),
         processResponseCallback,
-        fetchRequest.isolationLevel)
+        fetchRequest.isolationLevel,
+        clientMetadata)
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5123771..4765031 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -370,6 +370,7 @@ object KafkaConfig {
   val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol"
   val InterBrokerProtocolVersionProp = "inter.broker.protocol.version"
   val InterBrokerListenerNameProp = "inter.broker.listener.name"
+  val ReplicaSelectorClassProp = "replica.selector.class"
   /** ********* Controlled shutdown configuration ***********/
   val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
   val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
@@ -699,6 +700,7 @@ object KafkaConfig {
   " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list."
   val InterBrokerListenerNameDoc = s"Name of listener used for communication between brokers. If this is unset, the listener name is defined by $InterBrokerSecurityProtocolProp. " +
     s"It is an error to set this and $InterBrokerSecurityProtocolProp properties at the same time."
+  val ReplicaSelectorClassDoc = "The fully qualified class name that implements ReplicaSelector. This is used by the broker to find the preferred read replica. By default, we use an implementation that returns the leader."
   /** ********* Controlled shutdown configuration ***********/
   val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens"
   val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying."
@@ -966,6 +968,7 @@ object KafkaConfig {
       .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
       .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, ApiVersionValidator, MEDIUM, InterBrokerProtocolVersionDoc)
       .define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc)
+      .define(ReplicaSelectorClassProp, STRING, null, MEDIUM, ReplicaSelectorClassDoc)
 
       /** ********* Controlled shutdown configuration ***********/
       .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc)
@@ -1186,6 +1189,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
 
   /***************** rack configuration **************/
   val rack = Option(getString(KafkaConfig.RackProp))
+  val replicaSelectorClassName = Option(getString(KafkaConfig.ReplicaSelectorClassProp))
 
   /** ********* Log Configuration ***********/
   val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 2ec84f2..8b8e159 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
@@ -195,6 +196,24 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {
+    val snapshot = metadataSnapshot
+    snapshot.partitionStates.get(tp.topic()).flatMap(_.get(tp.partition())).map { partitionInfo =>
+      val replicaIds = partitionInfo.basePartitionState.replicas
+      replicaIds.asScala
+        .map(replicaId => replicaId.intValue() -> {
+          snapshot.aliveBrokers.get(replicaId.longValue()) match {
+            case Some(broker) =>
+              broker.getNode(listenerName).getOrElse(Node.noNode())
+            case None =>
+              Node.noNode()
+          }}).toMap
+        .filter(pair => pair match {
+          case (_, node) => !node.isEmpty
+        })
+    }.getOrElse(Map.empty[Int, Node])
+  }
+
   def getControllerId: Option[Int] = metadataSnapshot.controllerId
 
   def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 18f89f7..46e90fc 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -22,7 +22,7 @@ import java.util.Optional
 
 import kafka.api.Request
 import kafka.cluster.BrokerEndPoint
-import kafka.log.{LogAppendInfo, LogOffsetSnapshot}
+import kafka.log.LogAppendInfo
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.QuotaFactory.UnboundedQuota
 import org.apache.kafka.common.TopicPartition
@@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchResponse.PartitionData
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq, Set, mutable}
@@ -89,7 +89,8 @@ class ReplicaAlterLogDirsThread(name: String,
       request.fetchData.asScala.toSeq,
       UnboundedQuota,
       processResponseCallback,
-      request.isolationLevel)
+      request.isolationLevel,
+      None)
 
     if (partitionData == null)
       throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}")
@@ -121,18 +122,13 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
-    val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch)
-    offsetSnapshot.logStartOffset
+    val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
+    partition.localLogOrException.logStartOffset
   }
 
   override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
-    val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch)
-    offsetSnapshot.logEndOffset.messageOffset
-  }
-
-  private def offsetSnapshotFromCurrentReplica(topicPartition: TopicPartition, leaderEpoch: Int): LogOffsetSnapshot = {
     val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
-    partition.fetchOffsetSnapshot(Optional.of[Integer](leaderEpoch), fetchOnlyFromLeader = false)
+    partition.localLogOrException.logEndOffset
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 07b0928..bfb50b8 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -34,12 +34,15 @@ import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.ElectionType
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -47,7 +50,10 @@ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{ApiError, DeleteRecordsResponse, DescribeLogDirsResponse, EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, LeaderAndIsrResponse, OffsetsForLeaderEpochRequest, StopReplicaRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
+import org.apache.kafka.common.replica.{ClientMetadata, _}
 
+import scala.compat.java8.OptionConverters._
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq, Set, mutable}
 
@@ -75,7 +81,8 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
  * @param readSize amount of data that was read from the log i.e. size of the fetch
  * @param isReadFromLogEnd true if the request read up to the log end offset snapshot
  *                         when the read was initiated, false otherwise
- * @param error Exception if error encountered while reading from the log
+ * @param preferredReadReplica the preferred read replica to be used for future fetches
+ * @param exception Exception if error encountered while reading from the log
  */
 case class LogReadResult(info: FetchDataInfo,
                          highWatermark: Long,
@@ -85,6 +92,8 @@ case class LogReadResult(info: FetchDataInfo,
                          fetchTimeMs: Long,
                          readSize: Int,
                          lastStableOffset: Option[Long],
+                         preferredReadReplica: Option[Int] = None,
+                         followerNeedsHwUpdate: Boolean = false,
                          exception: Option[Throwable] = None) {
 
   def error: Errors = exception match {
@@ -106,7 +115,8 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
                               logStartOffset: Long,
                               records: Records,
                               lastStableOffset: Option[Long],
-                              abortedTransactions: Option[List[AbortedTransaction]])
+                              abortedTransactions: Option[List[AbortedTransaction]],
+                              preferredReadReplica: Option[Int])
 
 
 /**
@@ -216,6 +226,8 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  val replicaSelectorOpt: Option[ReplicaSelector] = createReplicaSelector()
+
   val leaderCount = newGauge(
     "LeaderCount",
     new Gauge[Int] {
@@ -807,8 +819,9 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * Fetch messages from the leader replica, and wait until enough data can be fetched and return;
-   * the callback function will be triggered either when timeout or required fetch info is satisfied
+   * Fetch messages from a replica, and wait until enough data can be fetched and return;
+   * the callback function will be triggered either when timeout or required fetch info is satisfied.
+   * Consumers may fetch from any replica, but followers can only fetch from the leader.
    */
   def fetchMessages(timeout: Long,
                     replicaId: Int,
@@ -818,9 +831,9 @@ class ReplicaManager(val config: KafkaConfig,
                     fetchInfos: Seq[(TopicPartition, PartitionData)],
                     quota: ReplicaQuota,
                     responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
-                    isolationLevel: IsolationLevel) {
+                    isolationLevel: IsolationLevel,
+                    clientMetadata: Option[ClientMetadata]) {
     val isFromFollower = Request.isValidBrokerId(replicaId)
-    val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId
 
     val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId)
       FetchLogEnd
@@ -829,16 +842,16 @@ class ReplicaManager(val config: KafkaConfig,
     else
       FetchHighWatermark
 
-
     def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
       val result = readFromLocalLog(
         replicaId = replicaId,
-        fetchOnlyFromLeader = fetchOnlyFromLeader,
+        fetchOnlyFromLeader = isFromFollower,
         fetchIsolation = fetchIsolation,
         fetchMaxBytes = fetchMaxBytes,
         hardMaxBytesLimit = hardMaxBytesLimit,
         readPartitionInfo = fetchInfos,
-        quota = quota)
+        quota = quota,
+        clientMetadata = clientMetadata)
       if (isFromFollower) updateFollowerFetchState(replicaId, result)
       else result
     }
@@ -849,23 +862,37 @@ class ReplicaManager(val config: KafkaConfig,
     var bytesReadable: Long = 0
     var errorReadingData = false
     val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
+    var anyPartitionsNeedHwUpdate = false
     logReadResults.foreach { case (topicPartition, logReadResult) =>
       if (logReadResult.error != Errors.NONE)
         errorReadingData = true
       bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
       logReadResultMap.put(topicPartition, logReadResult)
+      if (isFromFollower && logReadResult.followerNeedsHwUpdate) {
+        anyPartitionsNeedHwUpdate = true
+      }
     }
 
+    // Wrap the given callback function with another function that will update the HW for the remote follower
+    val updateHwAndThenCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit =
+      (fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]) => {
+        fetchPartitionData.foreach {
+          case (tp, partitionData) => updateFollowerHighWatermark(tp, replicaId, partitionData.highWatermark)
+        }
+        responseCallback(fetchPartitionData)
+      }
+
     // respond immediately if 1) fetch request does not want to wait
     //                        2) fetch request does not require any data
     //                        3) has enough data to respond
     //                        4) some error happens while reading data
-    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
+    //                        5) all the requested partitions need HW update
+    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || anyPartitionsNeedHwUpdate) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
         tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
-          result.lastStableOffset, result.info.abortedTransactions)
+          result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica)
       }
-      responseCallback(fetchPartitionData)
+      updateHwAndThenCallback(fetchPartitionData)
     } else {
       // construct the fetch results from the read results
       val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
@@ -875,9 +902,10 @@ class ReplicaManager(val config: KafkaConfig,
           fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
         })
       }
-      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
+      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, isFromFollower,
         fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
-      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
+      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
+        updateHwAndThenCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
       val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
@@ -898,7 +926,8 @@ class ReplicaManager(val config: KafkaConfig,
                        fetchMaxBytes: Int,
                        hardMaxBytesLimit: Boolean,
                        readPartitionInfo: Seq[(TopicPartition, PartitionData)],
-                       quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {
+                       quota: ReplicaQuota,
+                       clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = {
 
     def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
       val offset = fetchInfo.fetchOffset
@@ -917,35 +946,64 @@ class ReplicaManager(val config: KafkaConfig,
         val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
         val fetchTimeMs = time.milliseconds
 
-        // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
-        val readInfo = partition.readRecords(
-          fetchOffset = fetchInfo.fetchOffset,
-          currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
-          maxBytes = adjustedMaxBytes,
-          fetchIsolation = fetchIsolation,
-          fetchOnlyFromLeader = fetchOnlyFromLeader,
-          minOneMessage = minOneMessage)
-
-        val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) {
-          // If the partition is being throttled, simply return an empty set.
-          FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
-        } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
-          // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
-          // progress in such cases and don't need to report a `RecordTooLargeException`
-          FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
+        // If we are the leader, determine the preferred read-replica
+        val preferredReadReplica = clientMetadata.flatMap(
+          metadata => findPreferredReadReplica(tp, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs))
+
+        if (preferredReadReplica.isDefined) {
+          replicaSelectorOpt.foreach{ selector =>
+            debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
+              s"${preferredReadReplica.get} for $clientMetadata")
+          }
+          // If a preferred read-replica is set, skip the read
+          val offsetSnapshot: LogOffsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, false)
+          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+            highWatermark = offsetSnapshot.highWatermark.messageOffset,
+            leaderLogStartOffset = offsetSnapshot.logStartOffset,
+            leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
+            followerLogStartOffset = followerLogStartOffset,
+            fetchTimeMs = -1L,
+            readSize = 0,
+            lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
+            preferredReadReplica = preferredReadReplica,
+            exception = None)
         } else {
-          readInfo.fetchedData
-        }
+          // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
+          val readInfo: LogReadInfo = partition.readRecords(
+            fetchOffset = fetchInfo.fetchOffset,
+            currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
+            maxBytes = adjustedMaxBytes,
+            fetchIsolation = fetchIsolation,
+            fetchOnlyFromLeader = fetchOnlyFromLeader,
+            minOneMessage = minOneMessage)
+
+          // Check if the HW known to the follower is behind the actual HW
+          val followerNeedsHwUpdate: Boolean = partition.getReplica(replicaId)
+            .exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark)
+
+          val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) {
+            // If the partition is being throttled, simply return an empty set.
+            FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
+          } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
+            // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
+            // progress in such cases and don't need to report a `RecordTooLargeException`
+            FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
+          } else {
+            readInfo.fetchedData
+          }
 
-        LogReadResult(info = fetchDataInfo,
-                      highWatermark = readInfo.highWatermark,
-                      leaderLogStartOffset = readInfo.logStartOffset,
-                      leaderLogEndOffset = readInfo.logEndOffset,
-                      followerLogStartOffset = followerLogStartOffset,
-                      fetchTimeMs = fetchTimeMs,
-                      readSize = adjustedMaxBytes,
-                      lastStableOffset = Some(readInfo.lastStableOffset),
-                      exception = None)
+          LogReadResult(info = fetchDataInfo,
+            highWatermark = readInfo.highWatermark,
+            leaderLogStartOffset = readInfo.logStartOffset,
+            leaderLogEndOffset = readInfo.logEndOffset,
+            followerLogStartOffset = followerLogStartOffset,
+            fetchTimeMs = fetchTimeMs,
+            readSize = adjustedMaxBytes,
+            lastStableOffset = Some(readInfo.lastStableOffset),
+            preferredReadReplica = preferredReadReplica,
+            followerNeedsHwUpdate = followerNeedsHwUpdate,
+            exception = None)
+        }
       } catch {
         // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
         // is supposed to indicate un-expected failure of a broker in handling a fetch request
@@ -1001,6 +1059,59 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
+    * Using the configured [[ReplicaSelector]], determine the preferred read replica for a partition given the
+    * client metadata, the requested offset, and the current set of replicas. If the preferred read replica is the
+    * leader, return None
+    */
+  def findPreferredReadReplica(tp: TopicPartition,
+                               clientMetadata: ClientMetadata,
+                               replicaId: Int,
+                               fetchOffset: Long,
+                               currentTimeMs: Long): Option[Int] = {
+    val partition = getPartitionOrException(tp, expectLeader = false)
+
+    if (partition.isLeader) {
+      if (Request.isValidBrokerId(replicaId)) {
+        // Don't look up preferred for follower fetches via normal replication
+        Option.empty
+      } else {
+        replicaSelectorOpt.flatMap { replicaSelector =>
+          val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(tp, new ListenerName(clientMetadata.listenerName))
+          var replicaInfoSet: Set[ReplicaView] = partition.remoteReplicas
+            // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
+            .filter(replica => replica.logEndOffset >= fetchOffset)
+            .filter(replica => replica.logStartOffset <= fetchOffset)
+            .map(replica => new DefaultReplicaView(
+              replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
+              replica.logEndOffset,
+              currentTimeMs - replica.lastCaughtUpTimeMs
+            ))
+
+          if (partition.leaderReplicaIdOpt.isDefined) {
+            val leaderReplica: ReplicaView = partition.leaderReplicaIdOpt
+              .map(replicaId => replicaEndpoints.getOrElse(replicaId, Node.noNode()))
+              .map(leaderNode => new DefaultReplicaView(leaderNode, partition.localLogOrException.logEndOffset, 0L))
+              .get
+            replicaInfoSet ++= Set(leaderReplica)
+
+            val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
+            replicaSelector.select(tp, clientMetadata, partitionInfo).asScala
+              .filter(!_.endpoint.isEmpty)
+              // Even though the replica selector can return the leader, we don't want to send it out with the
+              // FetchResponse, so we exclude it here
+              .filter(!_.equals(leaderReplica))
+              .map(_.endpoint.id)
+          } else {
+            None
+          }
+        }
+      }
+    } else {
+      None
+    }
+  }
+
+  /**
    *  To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list,
    *  the quota is exceeded and the replica is not in sync.
    */
@@ -1424,6 +1535,15 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  private def updateFollowerHighWatermark(topicPartition: TopicPartition, followerId: Int, highWatermark: Long): Unit = {
+    nonOfflinePartition(topicPartition).flatMap(_.getReplica(followerId)) match {
+      case Some(replica) => replica.updateLastSentHighWatermark(highWatermark)
+      case None =>
+        warn(s"While updating the HW for follower $followerId for partition $topicPartition, " +
+          s"the replica could not be found.")
+    }
+  }
+
   private def leaderPartitionsIterator: Iterator[Partition] =
     nonOfflinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined)
 
@@ -1516,6 +1636,7 @@ class ReplicaManager(val config: KafkaConfig,
     delayedElectLeaderPurgatory.shutdown()
     if (checkpointHW)
       checkpointHighWatermarks()
+    replicaSelectorOpt.foreach(_.close)
     info("Shut down completely")
   }
 
@@ -1527,6 +1648,14 @@ class ReplicaManager(val config: KafkaConfig,
     new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats)
   }
 
+  protected def createReplicaSelector(): Option[ReplicaSelector] = {
+    config.replicaSelectorClassName.map { className =>
+      val tmpReplicaSelector: ReplicaSelector = CoreUtils.createObject[ReplicaSelector](className)
+      tmpReplicaSelector.configure(config.originals())
+      tmpReplicaSelector
+    }
+  }
+
   def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = {
     requestedEpochInfo.map { case (tp, partitionData) =>
       val epochEndOffset = getPartition(tp) match {
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 2d8fac1..d41bc5b 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -20,7 +20,8 @@ import java.util.Optional
 
 import scala.collection.Seq
 
-import kafka.cluster.Partition
+import kafka.cluster.{Partition, Replica}
+import kafka.log.LogOffsetSnapshot
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.FencedLeaderEpochException
 import org.apache.kafka.common.protocol.Errors
@@ -58,6 +59,7 @@ class DelayedFetchTest extends EasyMockSupport {
       fetchMetadata = fetchMetadata,
       replicaManager = replicaManager,
       quota = replicaQuota,
+      clientMetadata = None,
       responseCallback = callback)
 
     val partition: Partition = mock(classOf[Partition])
@@ -79,6 +81,89 @@ class DelayedFetchTest extends EasyMockSupport {
     assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.error)
   }
 
+  def checkCompleteWhenFollowerLaggingHW(followerHW: Option[Long], checkResult: DelayedFetch => Unit): Unit = {
+    val topicPartition = new TopicPartition("topic", 0)
+    val fetchOffset = 500L
+    val logStartOffset = 0L
+    val currentLeaderEpoch = Optional.of[Integer](10)
+    val replicaId = 1
+
+    val fetchStatus = FetchPartitionStatus(
+      startOffsetMetadata = LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus)
+
+    var fetchResultOpt: Option[FetchPartitionData] = None
+    def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val delayedFetch = new DelayedFetch(
+      delayMs = 500,
+      fetchMetadata = fetchMetadata,
+      replicaManager = replicaManager,
+      quota = replicaQuota,
+      clientMetadata = None,
+      responseCallback = callback
+    )
+
+    val partition: Partition = mock(classOf[Partition])
+
+    EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
+      .andReturn(partition)
+    EasyMock.expect(partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true))
+      .andReturn(
+        LogOffsetSnapshot(
+          logStartOffset = 0,
+          logEndOffset = new LogOffsetMetadata(500L),
+          highWatermark = new LogOffsetMetadata(480L),
+          lastStableOffset = new LogOffsetMetadata(400L)))
+
+    expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo)
+
+    val follower = new Replica(replicaId, topicPartition)
+    followerHW.foreach(hw => {
+      follower.updateFetchState(LogOffsetMetadata.UnknownOffsetMetadata, 0L, 0L, 0L)
+      follower.updateLastSentHighWatermark(hw)
+    })
+    EasyMock.expect(partition.getReplica(replicaId))
+        .andReturn(Some(follower))
+
+    replayAll()
+    checkResult.apply(delayedFetch)
+  }
+
+  @Test
+  def testCompleteWhenFollowerLaggingHW(): Unit = {
+    // No HW from the follower, should complete
+    resetAll
+    checkCompleteWhenFollowerLaggingHW(None, delayedFetch => {
+      assertTrue(delayedFetch.tryComplete())
+      assertTrue(delayedFetch.isCompleted)
+    })
+
+    // A higher HW from the follower (shouldn't actually be possible)
+    resetAll
+    checkCompleteWhenFollowerLaggingHW(Some(500), delayedFetch => {
+      assertFalse(delayedFetch.tryComplete())
+      assertFalse(delayedFetch.isCompleted)
+    })
+
+    // An equal HW from follower
+    resetAll
+    checkCompleteWhenFollowerLaggingHW(Some(480), delayedFetch => {
+      assertFalse(delayedFetch.tryComplete())
+      assertFalse(delayedFetch.isCompleted)
+    })
+
+    // A lower HW from follower, should complete the fetch
+    resetAll
+    checkCompleteWhenFollowerLaggingHW(Some(470), delayedFetch => {
+      assertTrue(delayedFetch.tryComplete())
+      assertTrue(delayedFetch.isCompleted)
+    })
+  }
+
   private def buildFetchMetadata(replicaId: Int,
                                  topicPartition: TopicPartition,
                                  fetchStatus: FetchPartitionStatus): FetchMetadata = {
@@ -103,10 +188,38 @@ class DelayedFetchTest extends EasyMockSupport {
       fetchMaxBytes = maxBytes,
       hardMaxBytesLimit = false,
       readPartitionInfo = Seq((topicPartition, fetchPartitionData)),
+      clientMetadata = None,
       quota = replicaQuota))
       .andReturn(Seq((topicPartition, buildReadResultWithError(error))))
   }
 
+  private def expectReadFromReplica(replicaId: Int,
+                                    topicPartition: TopicPartition,
+                                    fetchPartitionData: FetchRequest.PartitionData): Unit = {
+    val result = LogReadResult(
+      exception = None,
+      info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+      highWatermark = -1L,
+      leaderLogStartOffset = -1L,
+      leaderLogEndOffset = -1L,
+      followerLogStartOffset = -1L,
+      fetchTimeMs = -1L,
+      readSize = -1,
+      lastStableOffset = None)
+
+
+    EasyMock.expect(replicaManager.readFromLocalLog(
+      replicaId = replicaId,
+      fetchOnlyFromLeader = true,
+      fetchIsolation = FetchLogEnd,
+      fetchMaxBytes = maxBytes,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = Seq((topicPartition, fetchPartitionData)),
+      clientMetadata = None,
+      quota = replicaQuota))
+      .andReturn(Seq((topicPartition, result))).anyTimes()
+  }
+
   private def buildReadResultWithError(error: Errors): LogReadResult = {
     LogReadResult(
       exception = Some(error.exception),
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 4f2d319..33c0d95 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -242,9 +242,11 @@ class PartitionTest {
     val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
 
     def assertSnapshotError(expectedError: Errors, currentLeaderEpoch: Optional[Integer]): Unit = {
-      partition.fetchOffsetSnapshotOrError(currentLeaderEpoch, fetchOnlyFromLeader = true) match {
-        case Left(_) => assertEquals(Errors.NONE, expectedError)
-        case Right(error) => assertEquals(expectedError, error)
+      try {
+        partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true)
+        assertEquals(Errors.NONE, expectedError)
+      } catch {
+        case error: ApiException => assertEquals(expectedError, Errors.forException(error))
       }
     }
 
@@ -262,9 +264,11 @@ class PartitionTest {
     def assertSnapshotError(expectedError: Errors,
                             currentLeaderEpoch: Optional[Integer],
                             fetchOnlyLeader: Boolean): Unit = {
-      partition.fetchOffsetSnapshotOrError(currentLeaderEpoch, fetchOnlyFromLeader = fetchOnlyLeader) match {
-        case Left(_) => assertEquals(expectedError, Errors.NONE)
-        case Right(error) => assertEquals(expectedError, error)
+      try {
+        partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = fetchOnlyLeader)
+        assertEquals(Errors.NONE, expectedError)
+      } catch {
+        case error: ApiException => assertEquals(expectedError, Errors.forException(error))
       }
     }
 
@@ -1039,6 +1043,7 @@ class PartitionTest {
     assertEquals(time.milliseconds(), remoteReplica.lastCaughtUpTimeMs)
     assertEquals(6L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
+
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index a796d6f..c584fc0 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -3694,6 +3694,37 @@ class LogTest {
   }
 
   @Test
+  def testOffsetSnapshot() {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
+
+    // append a few records
+    appendAsFollower(log, MemoryRecords.withRecords(CompressionType.NONE,
+      new SimpleRecord("a".getBytes),
+      new SimpleRecord("b".getBytes),
+      new SimpleRecord("c".getBytes)), 5)
+
+
+    log.highWatermark = 2L
+    var offsets: LogOffsetSnapshot = log.offsetSnapshot
+    assertEquals(offsets.highWatermark.messageOffset, 2L)
+    assertFalse(offsets.highWatermark.messageOffsetOnly)
+
+    offsets = log.offsetSnapshot
+    assertEquals(offsets.highWatermark.messageOffset, 2L)
+    assertFalse(offsets.highWatermark.messageOffsetOnly)
+
+    try {
+      log.highWatermark = 100L
+      offsets = log.offsetSnapshot
+      fail("Should have thrown")
+    } catch {
+      case e: OffsetOutOfRangeException => // pass
+      case _ => fail("Should have seen OffsetOutOfRangeException")
+    }
+  }
+
+  @Test
   def testLastStableOffsetWithMixedProducerData() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 7d3a58b..0363963 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -205,7 +205,7 @@ class FetchRequestTest extends BaseRequestTest {
       Seq(topicPartition))).build()
     val fetchResponse = sendFetchRequest(nonReplicaId, fetchRequest)
     val partitionData = fetchResponse.responseData.get(topicPartition)
-    assertEquals(Errors.NOT_LEADER_FOR_PARTITION, partitionData.error)
+    assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionData.error)
   }
 
   @Test
@@ -238,8 +238,8 @@ class FetchRequestTest extends BaseRequestTest {
 
     // Check follower error codes
     val followerId = TestUtils.findFollowerId(topicPartition, servers)
-    assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.empty())
-    assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.of(secondLeaderEpoch))
+    assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.empty())
+    assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.of(secondLeaderEpoch))
     assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1))
     assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 41e7c38..c26c3fd 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -50,6 +50,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import EasyMock._
 import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData}
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.replica.ClientMetadata
 import org.junit.Assert.{assertEquals, assertNull, assertTrue}
 import org.junit.{After, Test}
 
@@ -464,14 +465,15 @@ class KafkaApisTest {
 
     replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
       anyObject[Seq[(TopicPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota],
-      anyObject[Seq[(TopicPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel])
+      anyObject[Seq[(TopicPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel],
+      anyObject[Option[ClientMetadata]])
     expectLastCall[Unit].andAnswer(new IAnswer[Unit] {
       def answer: Unit = {
         val callback = getCurrentArguments.apply(7).asInstanceOf[(Seq[(TopicPartition, FetchPartitionData)] => Unit)]
         val records = MemoryRecords.withRecords(CompressionType.NONE,
           new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8)))
         callback(Seq(tp -> new FetchPartitionData(Errors.NONE, hw, 0, records,
-          None, None)))
+          None, None, Option.empty)))
       }
     })
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 9d28c1b..850f553 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -655,6 +655,7 @@ class KafkaConfigTest {
         case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ReplicaFetchResponseMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ReplicaSelectorClassProp => // Ignore string
         case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 9005ec3..79457c0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -395,6 +395,7 @@ class ReplicaAlterLogDirsThreadTest {
       EasyMock.anyObject(),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject()))
       .andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = {
@@ -629,6 +630,7 @@ class ReplicaAlterLogDirsThreadTest {
       EasyMock.anyObject(),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject()))
       .andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 48c23e4..fddceff 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -64,7 +64,8 @@ class ReplicaManagerQuotasTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = quota)
+      quota = quota,
+      clientMetadata = None)
     assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
       fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
 
@@ -89,7 +90,8 @@ class ReplicaManagerQuotasTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = quota)
+      quota = quota,
+      clientMetadata = None)
     assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
       fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
     assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
@@ -113,7 +115,8 @@ class ReplicaManagerQuotasTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = quota)
+      quota = quota,
+      clientMetadata = None)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
       fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
@@ -137,7 +140,8 @@ class ReplicaManagerQuotasTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = quota)
+      quota = quota,
+      clientMetadata = None)
     assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
       fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
 
@@ -167,6 +171,7 @@ class ReplicaManagerQuotasTest {
 
       EasyMock.expect(replicaManager.shouldLeaderThrottle(EasyMock.anyObject[ReplicaQuota], EasyMock.anyObject[TopicPartition], EasyMock.anyObject[Int]))
         .andReturn(!isReplicaInSync).anyTimes()
+      EasyMock.expect(partition.getReplica(1)).andReturn(None)
       EasyMock.replay(replicaManager, partition)
 
       val tp = new TopicPartition("t1", 0)
@@ -179,9 +184,10 @@ class ReplicaManagerQuotasTest {
         fetchIsolation = FetchLogEnd,
         isFromFollower = true,
         replicaId = 1,
-        fetchPartitionStatus = List((tp, fetchPartitionStatus)))
+        fetchPartitionStatus = List((tp, fetchPartitionStatus))
+      )
       new DelayedFetch(delayMs = 600, fetchMetadata = fetchMetadata, replicaManager = replicaManager,
-        quota = null, responseCallback = null) {
+        quota = null, clientMetadata = None, responseCallback = null) {
         override def forceComplete(): Boolean = true
       }
     }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 87f2894..6091503 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -18,10 +18,13 @@
 package kafka.server
 
 import java.io.File
-import java.util.{Optional, Properties}
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.net.InetAddress
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.{Optional, Properties}
 
+import kafka.api.Request
+import kafka.cluster.BrokerEndPoint
 import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
@@ -29,16 +32,22 @@ import kafka.cluster.BrokerEndPoint
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.LazyOffsetCheckpoints
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
+import kafka.utils.TestUtils.createBroker
 import kafka.utils.timer.MockTimer
+import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, IsolationLevel, LeaderAndIsrRequest}
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
+import org.apache.kafka.common.replica.ClientMetadata
+import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.zookeeper.data.Stat
@@ -179,12 +188,6 @@ class ReplicaManagerTest {
         assertEquals(Errors.NOT_LEADER_FOR_PARTITION, response.error)
       }
 
-      // Fetch some messages
-      val fetchResult = fetchAsConsumer(rm, new TopicPartition(topic, 0),
-        new PartitionData(0, 0, 100000, Optional.empty()),
-        minBytes = 100000)
-      assertFalse(fetchResult.isFired)
-
       // Make this replica the follower
       val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
         collection.immutable.Map(new TopicPartition(topic, 0) ->
@@ -193,7 +196,6 @@ class ReplicaManagerTest {
       rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
 
       assertTrue(appendResult.isFired)
-      assertTrue(fetchResult.isFired)
     } finally {
       rm.shutdown(checkpointHW = false)
     }
@@ -515,7 +517,8 @@ class ReplicaManagerTest {
         fetchInfos = Seq(tp -> validFetchPartitionData),
         quota = UnboundedQuota,
         isolationLevel = IsolationLevel.READ_UNCOMMITTED,
-        responseCallback = callback
+        responseCallback = callback,
+        clientMetadata = None
       )
 
       assertTrue(successfulFetch.isDefined)
@@ -537,7 +540,8 @@ class ReplicaManagerTest {
         fetchInfos = Seq(tp -> invalidFetchPartitionData),
         quota = UnboundedQuota,
         isolationLevel = IsolationLevel.READ_UNCOMMITTED,
-        responseCallback = callback
+        responseCallback = callback,
+        clientMetadata = None
       )
 
       assertTrue(successfulFetch.isDefined)
@@ -617,7 +621,8 @@ class ReplicaManagerTest {
           tp1 -> new PartitionData(1, 0, 100000, Optional.empty())),
         quota = UnboundedQuota,
         responseCallback = fetchCallback,
-        isolationLevel = IsolationLevel.READ_UNCOMMITTED
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED,
+        clientMetadata = None
       )
       val tp0Log = replicaManager.localLog(tp0)
       assertTrue(tp0Log.isDefined)
@@ -678,6 +683,157 @@ class ReplicaManagerTest {
     EasyMock.verify(mockLogMgr)
   }
 
+  @Test
+  def testReplicaSelector(): Unit = {
+    val topicPartition = 0
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val controllerId = 0
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true)
+
+    val partition = replicaManager.createPartition(new TopicPartition(topic, topicPartition))
+
+    val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+    partition.createLogIfNotExists(leaderBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.makeLeader(
+      controllerId,
+      leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
+      correlationId,
+      offsetCheckpoints
+    )
+
+    val tp0 = new TopicPartition(topic, 0)
+
+    val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
+      InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+
+    // We expect to select the leader, which means we return None
+    val preferredReadReplica: Option[Int] = replicaManager.findPreferredReadReplica(
+      tp0, metadata, Request.OrdinaryConsumerId, 1L, System.currentTimeMillis)
+    assertFalse(preferredReadReplica.isDefined)
+  }
+
+  @Test
+  def testPreferredReplicaAsFollower(): Unit = {
+    val topicPartition = 0
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true)
+
+    val brokerList = Seq[Integer](0, 1).asJava
+
+    val tp0 = new TopicPartition(topic, 0)
+
+    replicaManager.createPartition(new TopicPartition(topic, 0))
+
+    // Make this replica the follower
+    val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+      collection.immutable.Map(new TopicPartition(topic, 0) ->
+        new LeaderAndIsrRequest.PartitionState(0, 1, 1, brokerList, 0, brokerList, false)).asJava,
+      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
+
+    val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
+      InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+
+    val consumerResult = fetchAsConsumer(replicaManager, tp0,
+      new PartitionData(0, 0, 100000, Optional.empty()),
+      clientMetadata = Some(metadata))
+
+    // Fetch from follower succeeds
+    assertTrue(consumerResult.isFired)
+
+    // But only leader will compute preferred replica
+    assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty)
+  }
+
+  @Test
+  def testPreferredReplicaAsLeader(): Unit = {
+    val topicPartition = 0
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true)
+
+    val brokerList = Seq[Integer](0, 1).asJava
+
+    val tp0 = new TopicPartition(topic, 0)
+
+    replicaManager.createPartition(new TopicPartition(topic, 0))
+
+    // Make this replica the follower
+    val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+      collection.immutable.Map(new TopicPartition(topic, 0) ->
+        new LeaderAndIsrRequest.PartitionState(0, 0, 1, brokerList, 0, brokerList, false)).asJava,
+      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
+
+    val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
+      InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+
+    val consumerResult = fetchAsConsumer(replicaManager, tp0,
+      new PartitionData(0, 0, 100000, Optional.empty()),
+      clientMetadata = Some(metadata))
+
+    // Fetch from follower succeeds
+    assertTrue(consumerResult.isFired)
+
+    // Returns a preferred replica (should just be the leader, which is None)
+    assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined)
+  }
+
+  @Test(expected = classOf[ClassNotFoundException])
+  def testUnknownReplicaSelector(): Unit = {
+    val topicPartition = 0
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    val props = new Properties()
+    props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class")
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props)
+  }
+
+  @Test
+  def testDefaultReplicaSelector(): Unit = {
+    val topicPartition = 0
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true)
+    assertFalse(replicaManager.replicaSelectorOpt.isDefined)
+  }
+
   /**
    * This method assumes that the test using created ReplicaManager calls
    * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing
@@ -688,9 +844,11 @@ class ReplicaManagerTest {
                                                  followerBrokerId: Int,
                                                  leaderBrokerId: Int,
                                                  countDownLatch: CountDownLatch,
-                                                 expectTruncation: Boolean) : (ReplicaManager, LogManager) = {
+                                                 expectTruncation: Boolean,
+                                                 extraProps: Properties = new Properties()) : (ReplicaManager, LogManager) = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    props.asScala ++= extraProps.asScala
     val config = KafkaConfig.fromProps(props)
 
     // Setup mock local log to have leader epoch of 3 and offset of 10
@@ -749,9 +907,16 @@ class ReplicaManagerTest {
         .andReturn(Option(createBroker(brokerId, s"host$brokerId", brokerId)))
         .anyTimes
     }
+    EasyMock
+      .expect(metadataCache.getPartitionReplicaEndpoints(
+        EasyMock.anyObject(), EasyMock.anyObject()))
+      .andReturn(Map(
+        leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"),
+        followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap
+      )
+      .anyTimes()
     EasyMock.replay(metadataCache)
 
-
     val timer = new MockTimer
     val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
       purgatoryName = "Produce", timer, reaperEnabled = false)
@@ -860,16 +1025,18 @@ class ReplicaManagerTest {
                               partition: TopicPartition,
                               partitionData: PartitionData,
                               minBytes: Int = 0,
-                              isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = {
-    fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel)
+                              isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED,
+                              clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = {
+    fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel, clientMetadata)
   }
 
   private def fetchAsFollower(replicaManager: ReplicaManager,
                               partition: TopicPartition,
                               partitionData: PartitionData,
                               minBytes: Int = 0,
-                              isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = {
-    fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel)
+                              isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED,
+                              clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = {
+    fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel, clientMetadata)
   }
 
   private def fetchMessages(replicaManager: ReplicaManager,
@@ -877,7 +1044,8 @@ class ReplicaManagerTest {
                             partition: TopicPartition,
                             partitionData: PartitionData,
                             minBytes: Int,
-                            isolationLevel: IsolationLevel): CallbackResult[FetchPartitionData] = {
+                            isolationLevel: IsolationLevel,
+                            clientMetadata: Option[ClientMetadata]): CallbackResult[FetchPartitionData] = {
     val result = new CallbackResult[FetchPartitionData]()
     def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
       assertEquals(1, responseStatus.size)
@@ -895,7 +1063,9 @@ class ReplicaManagerTest {
       fetchInfos = Seq(partition -> partitionData),
       quota = UnboundedQuota,
       responseCallback = fetchCallback,
-      isolationLevel = isolationLevel)
+      isolationLevel = isolationLevel,
+      clientMetadata = clientMetadata
+    )
 
     result
   }
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index bae1171..bf5c20e 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -178,7 +178,8 @@ class SimpleFetchTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = UnboundedQuota).find(_._1 == topicPartition)
+      quota = UnboundedQuota,
+      clientMetadata = None).find(_._1 == topicPartition)
     val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next()
     assertEquals("Reading committed data should return messages only up to high watermark", recordToHW,
       new SimpleRecord(firstReadRecord))
@@ -190,7 +191,8 @@ class SimpleFetchTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       readPartitionInfo = fetchInfo,
-      quota = UnboundedQuota).find(_._1 == topicPartition)
+      quota = UnboundedQuota,
+      clientMetadata = None).find(_._1 == topicPartition)
 
     val firstRecord = readAllRecords.get._2.info.records.records.iterator.next()
     assertEquals("Reading any data can return messages up to the end of the log", recordToLEO,
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index d85c9dc..3aeed90 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -86,6 +86,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
             jaas_override_variables     A dict of variables to be used in the jaas.conf template file
             kafka_opts_override         Override parameters of the KAFKA_OPTS environment variable
             client_prop_file_override   Override client.properties file used by the consumer
+            consumer_properties         A dict of values to pass in as --consumer-property key=value
         """
         JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
                           root=ConsoleConsumer.PERSISTENT_ROOT)
@@ -208,7 +209,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
 
         if self.consumer_properties is not None:
             for k, v in self.consumer_properties.items():
-                cmd += "--consumer_properties %s=%s" % (k, v)
+                cmd += " --consumer-property %s=%s" % (k, v)
 
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index bc99d96..eb0ddfd 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -94,7 +94,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
                  jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
-                 listener_security_config=ListenerSecurityConfig()):
+                 listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides={}):
         """
         :param context: test context
         :param ZookeeperService zk:
@@ -129,6 +129,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             self.server_prop_overides = []
         else:
             self.server_prop_overides = server_prop_overides
+        if per_node_server_prop_overrides is None:
+            self.per_node_server_prop_overrides = {}
+        else:
+            self.per_node_server_prop_overrides = per_node_server_prop_overrides
         self.log_level = "DEBUG"
         self.zk_chroot = zk_chroot
         self.listener_security_config = listener_security_config
@@ -295,6 +299,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         for prop in self.server_prop_overides:
             override_configs[prop[0]] = prop[1]
 
+        for prop in self.per_node_server_prop_overrides.get(self.idx(node), []):
+            override_configs[prop[0]] = prop[1]
+
         #update template configs with test override configs
         configs.update(override_configs)
 
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index cf8cbc3..c5b747d 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -27,9 +27,10 @@ class JmxMixin(object):
     - we assume the service using JmxMixin also uses KafkaPathResolverMixin
     - this uses the --wait option for JmxTool, so the list of object names must be explicit; no patterns are permitted
     """
-    def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, root="/mnt"):
+    def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, jmx_poll_ms=1000, root="/mnt"):
         self.jmx_object_names = jmx_object_names
         self.jmx_attributes = jmx_attributes or []
+        self.jmx_poll_ms = jmx_poll_ms
         self.jmx_port = 9192
 
         self.started = [False] * num_nodes
@@ -71,7 +72,7 @@ class JmxMixin(object):
         if use_jmxtool_version <= V_0_11_0_0:
             use_jmxtool_version = DEV_BRANCH
         cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version), self.jmx_class_name())
-        cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
+        cmd += "--reporting-interval %d --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % (self.jmx_poll_ms, self.jmx_port)
         cmd += " --wait"
         for jmx_object_name in self.jmx_object_names:
             cmd += " --object-name %s" % jmx_object_name
@@ -83,7 +84,7 @@ class JmxMixin(object):
 
         self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd))
         node.account.ssh(cmd, allow_fail=False)
-        wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
+        wait_until(lambda: self._jmx_has_output(node), timeout_sec=30, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
         self.started[idx-1] = True
 
     def _jmx_has_output(self, node):
diff --git a/tests/kafkatest/tests/client/truncation_test.py b/tests/kafkatest/tests/client/truncation_test.py
index 8269de7..523bcbc 100644
--- a/tests/kafkatest/tests/client/truncation_test.py
+++ b/tests/kafkatest/tests/client/truncation_test.py
@@ -21,7 +21,6 @@ from kafkatest.services.kafka import TopicPartition
 from kafkatest.services.verifiable_consumer import VerifiableConsumer
 
 
-
 class TruncationTest(VerifiableConsumerTest):
     TOPIC = "test_topic"
     NUM_PARTITIONS = 1


Mime
View raw message