kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9261; Client should handle unavailable leader metadata (#7770)
Date Wed, 05 Feb 2020 17:21:46 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new f210b7a  KAFKA-9261; Client should handle unavailable leader metadata (#7770)
f210b7a is described below

commit f210b7abbafd82c4b8491169bbb76458b85d1591
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Feb 5 09:13:11 2020 -0800

    KAFKA-9261; Client should handle unavailable leader metadata (#7770)
    
    The client caches metadata fetched from Metadata requests. Previously, each metadata response overwrote all of the metadata from the previous one, so we could rely on the expectation that the broker only returned the leaderId for a partition if it had connection information available. This behavior changed with KIP-320 since having the leader epoch allows the client to filter out partition metadata which is known to be stale. However, because of this, we can no longer rely on the requ [...]
    
    Fixing this issue was unfortunately not straightforward because the cache was built to maintain references to broker metadata through the `Node` object at the partition level. In order to keep the state consistent, each `Node` reference would need to be updated based on the new broker metadata. Instead of doing that, this patch changes the cache so that it is structured more closely with the Metadata response schema. Broker node information is maintained at the top level in a single c [...]
    
    Note that one of the side benefits of the refactor here is that we virtually eliminate one of the hotspots in Metadata request handling in `MetadataCache.getEndpoints` (which was renamed to `maybeFilterAliveReplicas`). The only reason this was expensive was because we had to build a new collection for the `Node` representations of each of the replica lists. This information was doomed to just get discarded on serialization, so the whole effort was wasteful. Now, we work with the lower [...]
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../java/org/apache/kafka/clients/Metadata.java    | 113 +++++++------
 .../org/apache/kafka/clients/MetadataCache.java    | 105 ++++--------
 .../org/apache/kafka/clients/NetworkClient.java    |   2 +-
 .../admin/internals/MetadataOperationContext.java  |   4 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |   4 +-
 .../kafka/clients/consumer/MockConsumer.java       |   5 +-
 .../kafka/clients/consumer/OffsetAndMetadata.java  |   4 +-
 .../consumer/internals/ConsumerCoordinator.java    |   6 +-
 .../kafka/clients/consumer/internals/Fetcher.java  |  55 +++---
 .../consumer/internals/SubscriptionState.java      |   8 +-
 .../main/java/org/apache/kafka/common/Cluster.java |  21 +--
 .../kafka/common/requests/MetadataResponse.java    | 184 ++++++++++-----------
 .../apache/kafka/common/requests/RequestUtils.java |   9 +-
 .../apache/kafka/clients/MetadataCacheTest.java    |  85 ++++++++++
 .../org/apache/kafka/clients/MetadataTest.java     | 115 ++++++++++---
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  51 +++---
 .../internals/ConsumerCoordinatorTest.java         |   5 +-
 .../consumer/internals/ConsumerMetadataTest.java   |   3 +-
 .../clients/consumer/internals/FetcherTest.java    |  37 +++--
 .../internals/OffsetForLeaderEpochClientTest.java  |   8 +-
 .../consumer/internals/SubscriptionStateTest.java  |  47 +++---
 .../kafka/common/requests/RequestResponseTest.java |  26 +--
 .../test/java/org/apache/kafka/test/TestUtils.java |  15 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   8 +-
 .../main/scala/kafka/server/MetadataCache.scala    |  89 ++++++----
 .../scala/unit/kafka/admin/AddPartitionsTest.scala |  23 ++-
 .../admin/TopicCommandWithAdminClientTest.scala    |   2 +-
 .../server/AbstractCreateTopicsRequestTest.scala   |   4 +-
 .../unit/kafka/server/MetadataCacheTest.scala      |  33 ++--
 .../unit/kafka/server/MetadataRequestTest.scala    |  43 +++--
 30 files changed, 635 insertions(+), 479 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 82c1b07..72f2d04 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -19,14 +19,12 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.LogContext;
@@ -43,10 +41,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
+import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
+
 /**
  * A class encapsulating some of the logic around metadata.
  * <p>
@@ -147,11 +146,16 @@ public class Metadata implements Closeable {
     }
 
     /**
-     * Request an update for the partition metadata iff the given leader epoch is at newer than the last seen leader epoch
+     * Request an update for the partition metadata iff the given leader epoch is newer than the last seen leader epoch
      */
     public synchronized boolean updateLastSeenEpochIfNewer(TopicPartition topicPartition, int leaderEpoch) {
         Objects.requireNonNull(topicPartition, "TopicPartition cannot be null");
-        return updateLastSeenEpoch(topicPartition, leaderEpoch, oldEpoch -> leaderEpoch > oldEpoch, true);
+        if (leaderEpoch < 0)
+            throw new IllegalArgumentException("Invalid leader epoch " + leaderEpoch + " (must be non-negative)");
+
+        boolean updated = updateLastSeenEpoch(topicPartition, leaderEpoch, oldEpoch -> leaderEpoch > oldEpoch);
+        this.needUpdate = this.needUpdate || updated;
+        return updated;
     }
 
 
@@ -165,21 +169,16 @@ public class Metadata implements Closeable {
      * @param topicPartition       topic+partition to update the epoch for
      * @param epoch                the new epoch
      * @param epochTest            a predicate to determine if the old epoch should be replaced
-     * @param setRequestUpdateFlag sets the "needUpdate" flag to true if the epoch is updated
      * @return true if the epoch was updated, false otherwise
      */
     private synchronized boolean updateLastSeenEpoch(TopicPartition topicPartition,
                                                      int epoch,
-                                                     Predicate<Integer> epochTest,
-                                                     boolean setRequestUpdateFlag) {
+                                                     Predicate<Integer> epochTest) {
         Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition);
         log.trace("Determining if we should replace existing epoch {} with new epoch {}", oldEpoch, epoch);
         if (oldEpoch == null || epochTest.test(oldEpoch)) {
             log.debug("Updating last seen epoch from {} to {} for partition {}", oldEpoch, epoch, topicPartition);
             lastSeenLeaderEpochs.put(topicPartition, epoch);
-            if (setRequestUpdateFlag) {
-                this.needUpdate = true;
-            }
             return true;
         } else {
             log.debug("Not replacing existing epoch {} with new epoch {} for partition {}", oldEpoch, epoch, topicPartition);
@@ -199,16 +198,29 @@ public class Metadata implements Closeable {
     /**
      * Return the cached partition info if it exists and a newer leader epoch isn't known about.
      */
-    public synchronized Optional<MetadataCache.PartitionInfoAndEpoch> partitionInfoIfCurrent(TopicPartition topicPartition) {
+    synchronized Optional<MetadataResponse.PartitionMetadata> partitionMetadataIfCurrent(TopicPartition topicPartition) {
         Integer epoch = lastSeenLeaderEpochs.get(topicPartition);
+        Optional<MetadataResponse.PartitionMetadata> partitionMetadata = cache.partitionMetadata(topicPartition);
         if (epoch == null) {
             // old cluster format (no epochs)
-            return cache.getPartitionInfo(topicPartition);
+            return partitionMetadata;
         } else {
-            return cache.getPartitionInfoHavingEpoch(topicPartition, epoch);
+            return partitionMetadata.filter(metadata ->
+                    metadata.leaderEpoch.orElse(NO_PARTITION_LEADER_EPOCH).equals(epoch));
         }
     }
 
+    public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) {
+        Optional<MetadataResponse.PartitionMetadata> maybeMetadata = partitionMetadataIfCurrent(topicPartition);
+        if (!maybeMetadata.isPresent())
+            return new LeaderAndEpoch(Optional.empty(), Optional.ofNullable(lastSeenLeaderEpochs.get(topicPartition)));
+
+        MetadataResponse.PartitionMetadata partitionMetadata = maybeMetadata.get();
+        Optional<Integer> leaderEpochOpt = partitionMetadata.leaderEpoch;
+        Optional<Node> leaderNodeOpt = partitionMetadata.leaderId.flatMap(cache::nodeById);
+        return new LeaderAndEpoch(leaderNodeOpt, leaderEpochOpt);
+    }
+
     public synchronized void bootstrap(List<InetSocketAddress> addresses) {
         this.needUpdate = true;
         this.updateVersion += 1;
@@ -245,7 +257,7 @@ public class Metadata implements Closeable {
         this.lastSuccessfulRefreshMs = now;
         this.updateVersion += 1;
 
-        String previousClusterId = cache.cluster().clusterResource().clusterId();
+        String previousClusterId = cache.clusterResource().clusterId();
 
         this.cache = handleMetadataResponse(response, topic -> retainTopic(topic.topic(), topic.isInternal(), now));
 
@@ -254,11 +266,11 @@ public class Metadata implements Closeable {
 
         this.lastSeenLeaderEpochs.keySet().removeIf(tp -> !retainTopic(tp.topic(), false, now));
 
-        String newClusterId = cache.cluster().clusterResource().clusterId();
+        String newClusterId = cache.clusterResource().clusterId();
         if (!Objects.equals(previousClusterId, newClusterId)) {
             log.info("Cluster ID: {}", newClusterId);
         }
-        clusterResourceListeners.onUpdate(cache.cluster().clusterResource());
+        clusterResourceListeners.onUpdate(cache.clusterResource());
 
         log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache);
     }
@@ -289,7 +301,7 @@ public class Metadata implements Closeable {
     private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
                                                  Predicate<MetadataResponse.TopicMetadata> topicsToRetain) {
         Set<String> internalTopics = new HashSet<>();
-        List<MetadataCache.PartitionInfoAndEpoch> partitions = new ArrayList<>();
+        List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
             if (!topicsToRetain.test(metadata))
                 continue;
@@ -300,12 +312,12 @@ public class Metadata implements Closeable {
                 for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
                     // Even if the partition's metadata includes an error, we need to handle
                     // the update to catch new epochs
-                    updatePartitionInfo(metadata.topic(), partitionMetadata,
-                            metadataResponse.hasReliableLeaderEpochs(), partitions::add);
+                    updateLatestMetadata(partitionMetadata, metadataResponse.hasReliableLeaderEpochs())
+                        .ifPresent(partitions::add);
 
-                    if (partitionMetadata.error().exception() instanceof InvalidMetadataException) {
+                    if (partitionMetadata.error.exception() instanceof InvalidMetadataException) {
                         log.debug("Requesting metadata update for partition {} due to error {}",
-                                new TopicPartition(metadata.topic(), partitionMetadata.partition()), partitionMetadata.error());
+                                partitionMetadata.topicPartition, partitionMetadata.error);
                         requestUpdate();
                     }
                 }
@@ -315,37 +327,36 @@ public class Metadata implements Closeable {
             }
         }
 
-        return new MetadataCache(metadataResponse.clusterId(), new ArrayList<>(metadataResponse.brokers()), partitions,
+        return new MetadataCache(metadataResponse.clusterId(),
+                metadataResponse.brokersById(),
+                partitions,
                 metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
                 metadataResponse.topicsByError(Errors.INVALID_TOPIC_EXCEPTION),
-                internalTopics, metadataResponse.controller());
+                internalTopics,
+                metadataResponse.controller());
     }
 
     /**
-     * Compute the correct PartitionInfo to cache for a topic+partition and pass to the given consumer.
+     * Compute the latest partition metadata to cache given ordering by leader epochs (if both
+     * available and reliable).
      */
-    private void updatePartitionInfo(String topic,
-                                     MetadataResponse.PartitionMetadata partitionMetadata,
-                                     boolean hasReliableLeaderEpoch,
-                                     Consumer<MetadataCache.PartitionInfoAndEpoch> partitionInfoConsumer) {
-        TopicPartition tp = new TopicPartition(topic, partitionMetadata.partition());
-
-        if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch().isPresent()) {
-            int newEpoch = partitionMetadata.leaderEpoch().get();
+    private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
+            MetadataResponse.PartitionMetadata partitionMetadata,
+            boolean hasReliableLeaderEpoch) {
+        TopicPartition tp = partitionMetadata.topicPartition;
+        if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
+            int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the previous one, update the metadata
-            if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch, false)) {
-                PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
-                partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info, newEpoch));
+            if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch)) {
+                return Optional.of(partitionMetadata);
             } else {
                 // Otherwise ignore the new metadata and use the previously cached info
-                cache.getPartitionInfo(tp).ifPresent(partitionInfoConsumer);
+                return cache.partitionMetadata(tp);
             }
         } else {
             // Handle old cluster formats as well as error responses where leader and epoch are missing
             lastSeenLeaderEpochs.remove(tp);
-            PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
-            partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info,
-                    RecordBatch.NO_PARTITION_LEADER_EPOCH));
+            return Optional.of(partitionMetadata.withoutLeaderEpoch());
         }
     }
 
@@ -491,23 +502,19 @@ public class Metadata implements Closeable {
         }
     }
 
-    public synchronized LeaderAndEpoch leaderAndEpoch(TopicPartition tp) {
-        return partitionInfoIfCurrent(tp)
-                .map(infoAndEpoch -> {
-                    Node leader = infoAndEpoch.partitionInfo().leader();
-                    return new LeaderAndEpoch(leader == null ? Node.noNode() : leader, Optional.of(infoAndEpoch.epoch()));
-                })
-                .orElse(new LeaderAndEpoch(Node.noNode(), lastSeenLeaderEpoch(tp)));
-    }
-
+    /**
+     * Represents current leader state known in metadata. It is possible that we know the leader, but not the
+     * epoch if the metadata is received from a broker which does not support a sufficient Metadata API version.
+     * It is also possible that we know of the leader epoch, but not the leader when it is derived
+     * from an external source (e.g. a committed offset).
+     */
     public static class LeaderAndEpoch {
+        private static final LeaderAndEpoch NO_LEADER_OR_EPOCH = new LeaderAndEpoch(Optional.empty(), Optional.empty());
 
-        public static final LeaderAndEpoch NO_LEADER_OR_EPOCH = new LeaderAndEpoch(Node.noNode(), Optional.empty());
-
-        public final Node leader;
+        public final Optional<Node> leader;
         public final Optional<Integer> epoch;
 
-        public LeaderAndEpoch(Node leader, Optional<Integer> epoch) {
+        public LeaderAndEpoch(Optional<Node> leader, Optional<Integer> epoch) {
             this.leader = Objects.requireNonNull(leader);
             this.epoch = Objects.requireNonNull(epoch);
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index e58da12..3230087 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -17,18 +17,18 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.MetadataResponse;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -39,18 +39,18 @@ import java.util.stream.Collectors;
  */
 public class MetadataCache {
     private final String clusterId;
-    private final List<Node> nodes;
+    private final Map<Integer, Node> nodes;
     private final Set<String> unauthorizedTopics;
     private final Set<String> invalidTopics;
     private final Set<String> internalTopics;
     private final Node controller;
-    private final Map<TopicPartition, PartitionInfoAndEpoch> metadataByPartition;
+    private final Map<TopicPartition, MetadataResponse.PartitionMetadata> metadataByPartition;
 
     private Cluster clusterInstance;
 
     MetadataCache(String clusterId,
-                  List<Node> nodes,
-                  Collection<PartitionInfoAndEpoch> partitions,
+                  Map<Integer, Node> nodes,
+                  Collection<MetadataResponse.PartitionMetadata> partitions,
                   Set<String> unauthorizedTopics,
                   Set<String> invalidTopics,
                   Set<String> internalTopics,
@@ -58,14 +58,14 @@ public class MetadataCache {
         this(clusterId, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, null);
     }
 
-    MetadataCache(String clusterId,
-                  List<Node> nodes,
-                  Collection<PartitionInfoAndEpoch> partitions,
-                  Set<String> unauthorizedTopics,
-                  Set<String> invalidTopics,
-                  Set<String> internalTopics,
-                  Node controller,
-                  Cluster clusterInstance) {
+    private MetadataCache(String clusterId,
+                          Map<Integer, Node> nodes,
+                          Collection<MetadataResponse.PartitionMetadata> partitions,
+                          Set<String> unauthorizedTopics,
+                          Set<String> invalidTopics,
+                          Set<String> internalTopics,
+                          Node controller,
+                          Cluster clusterInstance) {
         this.clusterId = clusterId;
         this.nodes = nodes;
         this.unauthorizedTopics = unauthorizedTopics;
@@ -74,8 +74,8 @@ public class MetadataCache {
         this.controller = controller;
 
         this.metadataByPartition = new HashMap<>(partitions.size());
-        for (PartitionInfoAndEpoch p : partitions) {
-            this.metadataByPartition.put(new TopicPartition(p.partitionInfo().topic(), p.partitionInfo().partition()), p);
+        for (MetadataResponse.PartitionMetadata p : partitions) {
+            this.metadataByPartition.put(p.topicPartition, p);
         }
 
         if (clusterInstance == null) {
@@ -85,16 +85,12 @@ public class MetadataCache {
         }
     }
 
-    /**
-     * Return the cached PartitionInfo iff it was for the given epoch
-     */
-    Optional<PartitionInfoAndEpoch> getPartitionInfoHavingEpoch(TopicPartition topicPartition, int epoch) {
-        PartitionInfoAndEpoch infoAndEpoch = metadataByPartition.get(topicPartition);
-        return Optional.ofNullable(infoAndEpoch).filter(infoEpoch -> infoEpoch.epoch() == epoch);
+    Optional<MetadataResponse.PartitionMetadata> partitionMetadata(TopicPartition topicPartition) {
+        return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
-    Optional<PartitionInfoAndEpoch> getPartitionInfo(TopicPartition topicPartition) {
-        return Optional.ofNullable(metadataByPartition.get(topicPartition));
+    Optional<Node> nodeById(int id) {
+        return Optional.ofNullable(nodes.get(id));
     }
 
     Cluster cluster() {
@@ -105,25 +101,33 @@ public class MetadataCache {
         }
     }
 
+    ClusterResource clusterResource() {
+        return new ClusterResource(clusterId);
+    }
+
     private void computeClusterView() {
         List<PartitionInfo> partitionInfos = metadataByPartition.values()
                 .stream()
-                .map(PartitionInfoAndEpoch::partitionInfo)
+                .map(metadata -> MetadataResponse.toPartitionInfo(metadata, nodes))
                 .collect(Collectors.toList());
-        this.clusterInstance = new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, invalidTopics, internalTopics, controller);
+        this.clusterInstance = new Cluster(clusterId, nodes.values(), partitionInfos, unauthorizedTopics,
+                invalidTopics, internalTopics, controller);
     }
 
     static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
-        List<Node> nodes = new ArrayList<>();
+        Map<Integer, Node> nodes = new HashMap<>();
         int nodeId = -1;
-        for (InetSocketAddress address : addresses)
-            nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
+        for (InetSocketAddress address : addresses) {
+            nodes.put(nodeId, new Node(nodeId, address.getHostString(), address.getPort()));
+            nodeId--;
+        }
         return new MetadataCache(null, nodes, Collections.emptyList(),
-                Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Cluster.bootstrap(addresses));
+                Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
+                null, Cluster.bootstrap(addresses));
     }
 
     static MetadataCache empty() {
-        return new MetadataCache(null, Collections.emptyList(), Collections.emptyList(),
+        return new MetadataCache(null, Collections.emptyMap(), Collections.emptyList(),
                 Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Cluster.empty());
     }
 
@@ -137,43 +141,4 @@ public class MetadataCache {
                 '}';
     }
 
-    public static class PartitionInfoAndEpoch {
-        private final PartitionInfo partitionInfo;
-        private final int epoch;
-
-        PartitionInfoAndEpoch(PartitionInfo partitionInfo, int epoch) {
-            this.partitionInfo = partitionInfo;
-            this.epoch = epoch;
-        }
-
-        public PartitionInfo partitionInfo() {
-            return partitionInfo;
-        }
-
-        public int epoch() {
-            return epoch;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            PartitionInfoAndEpoch that = (PartitionInfoAndEpoch) o;
-            return epoch == that.epoch &&
-                    Objects.equals(partitionInfo, that.partitionInfo);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(partitionInfo, epoch);
-        }
-
-        @Override
-        public String toString() {
-            return "PartitionInfoAndEpoch{" +
-                    "partitionInfo=" + partitionInfo +
-                    ", epoch=" + epoch +
-                    '}';
-        }
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 61e8cd1..3060a50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1055,7 +1055,7 @@ public class NetworkClient implements KafkaClient {
             // This could be a transient issue if listeners were added dynamically to brokers.
             List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
                 topicMetadata.partitionMetadata().stream()
-                    .filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND)
+                    .filter(partitionMetadata -> partitionMetadata.error == Errors.LISTENER_NOT_FOUND)
                     .map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
                 .collect(Collectors.toList());
             if (!missingListenerPartitions.isEmpty()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
index e6f4054..c05e5cf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -83,8 +83,8 @@ public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
             for (PartitionMetadata pm : tm.partitionMetadata()) {
-                if (shouldRefreshMetadata(pm.error())) {
-                    throw pm.error().exception();
+                if (shouldRefreshMetadata(pm.error)) {
+                    throw pm.error.exception();
                 }
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2052af9..20da83f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1604,7 +1604,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
                     offset,
                     Optional.empty(), // This will ensure we skip validation
-                    this.metadata.leaderAndEpoch(partition));
+                    this.metadata.currentLeader(partition));
             this.subscriptions.seekUnvalidated(partition, newPosition);
         } finally {
             release();
@@ -1635,7 +1635,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             } else {
                 log.info("Seeking to offset {} for partition {}", offset, partition);
             }
-            Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.leaderAndEpoch(partition);
+            Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.currentLeader(partition);
             SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
                     offsetAndMetadata.offset(),
                     offsetAndMetadata.leaderEpoch(),
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index bd8b93c..67b4e9f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
@@ -37,6 +36,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -202,8 +202,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
                     if (assignment().contains(entry.getKey()) && rec.offset() >= position) {
                         results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec);
+                        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.empty(), rec.leaderEpoch());
                         SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
-                                rec.offset() + 1, rec.leaderEpoch(), new Metadata.LeaderAndEpoch(Node.noNode(), rec.leaderEpoch()));
+                                rec.offset() + 1, rec.leaderEpoch(), leaderAndEpoch);
                         subscriptions.position(entry.getKey(), newPosition);
                     }
                 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index aa91e50..d6b3b94 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -94,7 +94,9 @@ public class OffsetAndMetadata implements Serializable {
      * @return the leader epoch or empty if not known
      */
     public Optional<Integer> leaderEpoch() {
-        return Optional.ofNullable(leaderEpoch);
+        if (leaderEpoch == null || leaderEpoch < 0)
+            return Optional.empty();
+        return Optional.of(leaderEpoch);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 6610bfb02..94209d2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -766,10 +766,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 // it's possible that the partition is no longer assigned when the response is received,
                 // so we need to ignore seeking if that's the case
                 if (this.subscriptions.isAssigned(tp)) {
-                    final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp);
+                    final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp);
                     final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
-                        offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
-                        leaderAndEpoch);
+                            offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
+                            leaderAndEpoch);
 
                     this.subscriptions.seekUnvalidated(tp, position);
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 32d3976..7890c9a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
-import org.apache.kafka.clients.MetadataCache;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.ApiVersion;
 import org.apache.kafka.clients.StaleMetadataException;
@@ -485,7 +485,7 @@ public class Fetcher<K, V> implements Closeable {
 
         // Validate each partition against the current leader and epoch
         subscriptions.assignedPartitions().forEach(topicPartition -> {
-            ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(topicPartition);
+            ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition);
             subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, leaderAndEpoch);
         });
 
@@ -716,7 +716,7 @@ public class Fetcher<K, V> implements Closeable {
 
     private void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
         SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
-                offsetData.offset, offsetData.leaderEpoch, metadata.leaderAndEpoch(partition));
+                offsetData.offset, offsetData.leaderEpoch, metadata.currentLeader(partition));
         offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch));
         subscriptions.maybeSeekUnvalidated(partition, position.offset, requestedResetStrategy);
     }
@@ -904,27 +904,26 @@ public class Fetcher<K, V> implements Closeable {
         for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
             TopicPartition tp  = entry.getKey();
             Long offset = entry.getValue();
-            Optional<MetadataCache.PartitionInfoAndEpoch> currentInfo = metadata.partitionInfoIfCurrent(tp);
-            if (!currentInfo.isPresent()) {
+            Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp);
+
+            if (!leaderAndEpoch.leader.isPresent()) {
                 log.debug("Leader for partition {} is unknown for fetching offset {}", tp, offset);
                 metadata.requestUpdate();
                 partitionsToRetry.add(tp);
-            } else if (currentInfo.get().partitionInfo().leader() == null) {
-                log.debug("Leader for partition {} is unavailable for fetching offset {}", tp, offset);
-                metadata.requestUpdate();
-                partitionsToRetry.add(tp);
-            } else if (client.isUnavailable(currentInfo.get().partitionInfo().leader())) {
-                client.maybeThrowAuthFailure(currentInfo.get().partitionInfo().leader());
-
-                // The connection has failed and we need to await the blackout period before we can
-                // try again. No need to request a metadata update since the disconnect will have
-                // done so already.
-                log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
-                        currentInfo.get().partitionInfo().leader(), tp);
-                partitionsToRetry.add(tp);
             } else {
-                partitionDataMap.put(tp,
-                        new ListOffsetRequest.PartitionData(offset, Optional.of(currentInfo.get().epoch())));
+                Node leader = leaderAndEpoch.leader.get();
+                if (client.isUnavailable(leader)) {
+                    client.maybeThrowAuthFailure(leader);
+
+                    // The connection has failed and we need to await the blackout period before we can
+                    // try again. No need to request a metadata update since the disconnect will have
+                    // done so already.
+                    log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
+                            leader, tp);
+                    partitionsToRetry.add(tp);
+                } else {
+                    partitionDataMap.put(tp, new ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch));
+                }
             }
         }
         return regroupPartitionMapByNode(partitionDataMap);
@@ -1100,18 +1099,21 @@ public class Fetcher<K, V> implements Closeable {
 
         // Ensure the position has an up-to-date leader
         subscriptions.assignedPartitions().forEach(
-            tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp)));
+            tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.currentLeader(tp)));
 
         long currentTimeMs = time.milliseconds();
 
         for (TopicPartition partition : fetchablePartitions()) {
             // Use the preferred read replica if set, or the position's leader
             SubscriptionState.FetchPosition position = this.subscriptions.position(partition);
-            Node node = selectReadReplica(partition, position.currentLeader.leader, currentTimeMs);
-
-            if (node == null || node.isEmpty()) {
+            Optional<Node> leaderOpt = position.currentLeader.leader;
+            if (!leaderOpt.isPresent()) {
                 metadata.requestUpdate();
-            } else if (client.isUnavailable(node)) {
+                continue;
+            }
+
+            Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
+            if (client.isUnavailable(node)) {
                 client.maybeThrowAuthFailure(node);
 
                 // If we try to send during the reconnect blackout window, then the request is just
@@ -1153,7 +1155,8 @@ public class Fetcher<K, V> implements Closeable {
             Map<TopicPartition, SubscriptionState.FetchPosition> partitionMap) {
         return partitionMap.entrySet()
                 .stream()
-                .collect(Collectors.groupingBy(entry -> entry.getValue().currentLeader.leader,
+                .filter(entry -> entry.getValue().currentLeader.leader.isPresent())
+                .collect(Collectors.groupingBy(entry -> entry.getValue().currentLeader.leader.get(),
                         Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index ec7c376..9200eb8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -16,20 +16,19 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import java.util.ArrayList;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.IsolationLevel;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.PartitionStates;
 import org.apache.kafka.common.requests.EpochEndOffset;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -750,8 +749,7 @@ public class SubscriptionState {
                 return false;
             }
 
-            if (currentLeaderAndEpoch.equals(Metadata.LeaderAndEpoch.noLeaderOrEpoch())) {
-                // Ignore empty LeaderAndEpochs
+            if (!currentLeaderAndEpoch.leader.isPresent() && !currentLeaderAndEpoch.epoch.isPresent()) {
                 return false;
             }
 
@@ -985,7 +983,7 @@ public class SubscriptionState {
         final Metadata.LeaderAndEpoch currentLeader;
 
         FetchPosition(long offset) {
-            this(offset, Optional.empty(), new Metadata.LeaderAndEpoch(Node.noNode(), Optional.empty()));
+            this(offset, Optional.empty(), Metadata.LeaderAndEpoch.noLeaderOrEpoch());
         }
 
         public FetchPosition(long offset, Optional<Integer> offsetEpoch, Metadata.LeaderAndEpoch currentLeader) {
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index c765cdc..a6bf763 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -122,18 +122,15 @@ public final class Cluster {
         Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new HashMap<>();
         for (PartitionInfo p : partitions) {
             tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
-            List<PartitionInfo> partitionsForTopic = tmpPartitionsByTopic.get(p.topic());
-            if (partitionsForTopic == null) {
-                partitionsForTopic = new ArrayList<>();
-                tmpPartitionsByTopic.put(p.topic(), partitionsForTopic);
-            }
-            partitionsForTopic.add(p);
-            if (p.leader() != null) {
-                // The broker guarantees that if a partition has a non-null leader, it is one of the brokers returned
-                // in the metadata response
-                List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
-                partitionsForNode.add(p);
-            }
+            tmpPartitionsByTopic.computeIfAbsent(p.topic(), topic -> new ArrayList<>()).add(p);
+
+            // The leader may not be known
+            if (p.leader() == null || p.leader().isEmpty())
+                continue;
+
+            // If it is known, its node information should be available
+            List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
+            partitionsForNode.add(p);
         }
 
         // Update the values of `tmpPartitionsByNode` to contain unmodifiable lists
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index e4e09a5..4fee563 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -19,10 +19,11 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.MetadataResponseData;
-import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
-import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -40,6 +41,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -55,7 +57,7 @@ import java.util.stream.Collectors;
  */
 public class MetadataResponse extends AbstractResponse {
     public static final int NO_CONTROLLER_ID = -1;
-
+    public static final int NO_LEADER_ID = -1;
     public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;
 
     private final MetadataResponseData data;
@@ -131,12 +133,13 @@ public class MetadataResponse extends AbstractResponse {
     public Cluster cluster() {
         Set<String> internalTopics = new HashSet<>();
         List<PartitionInfo> partitions = new ArrayList<>();
+
         for (TopicMetadata metadata : topicMetadata()) {
             if (metadata.error == Errors.NONE) {
                 if (metadata.isInternal)
                     internalTopics.add(metadata.topic);
                 for (PartitionMetadata partitionMetadata : metadata.partitionMetadata) {
-                    partitions.add(partitionMetaToInfo(metadata.topic, partitionMetadata));
+                    partitions.add(toPartitionInfo(partitionMetadata, holder().brokers));
                 }
             }
         }
@@ -144,6 +147,24 @@ public class MetadataResponse extends AbstractResponse {
                 topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, controller());
     }
 
+    public static PartitionInfo toPartitionInfo(PartitionMetadata metadata, Map<Integer, Node> nodesById) {
+        return new PartitionInfo(metadata.topic(),
+                metadata.partition(),
+                metadata.leaderId.map(nodesById::get).orElse(null),
+                convertToNodeArray(metadata.replicaIds, nodesById),
+                convertToNodeArray(metadata.inSyncReplicaIds, nodesById),
+                convertToNodeArray(metadata.offlineReplicaIds, nodesById));
+    }
+
+    private static Node[] convertToNodeArray(List<Integer> replicaIds, Map<Integer, Node> nodesById) {
+        return replicaIds.stream().map(replicaId -> {
+            Node node = nodesById.get(replicaId);
+            if (node == null)
+                return new Node(replicaId, "", -1);
+            return node;
+        }).toArray(Node[]::new);
+    }
+
     /**
      * Returns a 32-bit bitfield to represent authorized operations for this topic.
      */
@@ -162,19 +183,6 @@ public class MetadataResponse extends AbstractResponse {
         return data.clusterAuthorizedOperations();
     }
 
-    /**
-     * Transform a topic and PartitionMetadata into PartitionInfo.
-     */
-    public static PartitionInfo partitionMetaToInfo(String topic, PartitionMetadata partitionMetadata) {
-        return new PartitionInfo(
-                topic,
-                partitionMetadata.partition(),
-                partitionMetadata.leader(),
-                partitionMetadata.replicas().toArray(new Node[0]),
-                partitionMetadata.isr().toArray(new Node[0]),
-                partitionMetadata.offlineReplicas().toArray(new Node[0]));
-    }
-
     private Holder holder() {
         if (holder == null) {
             synchronized (data) {
@@ -190,6 +198,10 @@ public class MetadataResponse extends AbstractResponse {
      * @return the brokers
      */
     public Collection<Node> brokers() {
+        return holder().brokers.values();
+    }
+
+    public Map<Integer, Node> brokersById() {
         return holder().brokers;
     }
 
@@ -314,93 +326,78 @@ public class MetadataResponse extends AbstractResponse {
 
     // This is used to describe per-partition state in the MetadataResponse
     public static class PartitionMetadata {
-        private final Errors error;
-        private final int partition;
-        private final Node leader;
-        private final Optional<Integer> leaderEpoch;
-        private final List<Node> replicas;
-        private final List<Node> isr;
-        private final List<Node> offlineReplicas;
+        public final TopicPartition topicPartition;
+        public final Errors error;
+        public final Optional<Integer> leaderId;
+        public final Optional<Integer> leaderEpoch;
+        public final List<Integer> replicaIds;
+        public final List<Integer> inSyncReplicaIds;
+        public final List<Integer> offlineReplicaIds;
 
         public PartitionMetadata(Errors error,
-                                 int partition,
-                                 Node leader,
+                                 TopicPartition topicPartition,
+                                 Optional<Integer> leaderId,
                                  Optional<Integer> leaderEpoch,
-                                 List<Node> replicas,
-                                 List<Node> isr,
-                                 List<Node> offlineReplicas) {
+                                 List<Integer> replicaIds,
+                                 List<Integer> inSyncReplicaIds,
+                                 List<Integer> offlineReplicaIds) {
             this.error = error;
-            this.partition = partition;
-            this.leader = leader;
+            this.topicPartition = topicPartition;
+            this.leaderId = leaderId;
             this.leaderEpoch = leaderEpoch;
-            this.replicas = replicas;
-            this.isr = isr;
-            this.offlineReplicas = offlineReplicas;
-        }
-
-        public Errors error() {
-            return error;
+            this.replicaIds = replicaIds;
+            this.inSyncReplicaIds = inSyncReplicaIds;
+            this.offlineReplicaIds = offlineReplicaIds;
         }
 
         public int partition() {
-            return partition;
-        }
-
-        public int leaderId() {
-            return leader == null ? -1 : leader.id();
-        }
-
-        public Optional<Integer> leaderEpoch() {
-            return leaderEpoch;
-        }
-
-        public Node leader() {
-            return leader;
+            return topicPartition.partition();
         }
 
-        public List<Node> replicas() {
-            return replicas;
-        }
-
-        public List<Node> isr() {
-            return isr;
+        public String topic() {
+            return topicPartition.topic();
         }
 
-        public List<Node> offlineReplicas() {
-            return offlineReplicas;
+        public PartitionMetadata withoutLeaderEpoch() {
+            return new PartitionMetadata(error,
+                    topicPartition,
+                    leaderId,
+                    Optional.empty(),
+                    replicaIds,
+                    inSyncReplicaIds,
+                    offlineReplicaIds);
         }
 
         @Override
         public String toString() {
-            return "(type=PartitionMetadata" +
+            return "PartitionMetadata(" +
                     ", error=" + error +
-                    ", partition=" + partition +
-                    ", leader=" + leader +
+                    ", partition=" + topicPartition +
+                    ", leader=" + leaderId +
                     ", leaderEpoch=" + leaderEpoch +
-                    ", replicas=" + Utils.join(replicas, ",") +
-                    ", isr=" + Utils.join(isr, ",") +
-                    ", offlineReplicas=" + Utils.join(offlineReplicas, ",") + ')';
+                    ", replicas=" + Utils.join(replicaIds, ",") +
+                    ", isr=" + Utils.join(inSyncReplicaIds, ",") +
+                    ", offlineReplicas=" + Utils.join(offlineReplicaIds, ",") + ')';
         }
     }
 
     private static class Holder {
-        private final Collection<Node> brokers;
+        private final Map<Integer, Node> brokers;
         private final Node controller;
         private final Collection<TopicMetadata> topicMetadata;
 
         Holder(MetadataResponseData data) {
-            this.brokers = Collections.unmodifiableCollection(createBrokers(data));
-            Map<Integer, Node> brokerMap = brokers.stream().collect(Collectors.toMap(Node::id, b -> b));
-            this.topicMetadata = createTopicMetadata(data, brokerMap);
-            this.controller = brokerMap.get(data.controllerId());
+            this.brokers = Collections.unmodifiableMap(createBrokers(data));
+            this.topicMetadata = createTopicMetadata(data);
+            this.controller = brokers.get(data.controllerId());
         }
 
-        private Collection<Node> createBrokers(MetadataResponseData data) {
-            return data.brokers().valuesList().stream().map(b ->
-                    new Node(b.nodeId(), b.host(), b.port(), b.rack())).collect(Collectors.toList());
+        private Map<Integer, Node> createBrokers(MetadataResponseData data) {
+            return data.brokers().valuesList().stream().map(b -> new Node(b.nodeId(), b.host(), b.port(), b.rack()))
+                    .collect(Collectors.toMap(Node::id, Function.identity()));
         }
 
-        private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data, Map<Integer, Node> brokerMap) {
+        private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data) {
             List<TopicMetadata> topicMetadataList = new ArrayList<>();
             for (MetadataResponseTopic topicMetadata : data.topics()) {
                 Errors topicError = Errors.forCode(topicMetadata.errorCode());
@@ -411,14 +408,15 @@ public class MetadataResponse extends AbstractResponse {
                 for (MetadataResponsePartition partitionMetadata : topicMetadata.partitions()) {
                     Errors partitionError = Errors.forCode(partitionMetadata.errorCode());
                     int partitionIndex = partitionMetadata.partitionIndex();
-                    int leader = partitionMetadata.leaderId();
+
+                    int leaderId = partitionMetadata.leaderId();
+                    Optional<Integer> leaderIdOpt = leaderId < 0 ? Optional.empty() : Optional.of(leaderId);
+
                     Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
-                    Node leaderNode = leader == -1 ? null : brokerMap.get(leader);
-                    List<Node> replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes());
-                    List<Node> isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes());
-                    List<Node> offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas());
-                    partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
-                            replicaNodes, isrNodes, offlineNodes));
+                    TopicPartition topicPartition = new TopicPartition(topic, partitionIndex);
+                    partitionMetadataList.add(new PartitionMetadata(partitionError, topicPartition, leaderIdOpt,
+                            leaderEpoch, partitionMetadata.replicaNodes(), partitionMetadata.isrNodes(),
+                            partitionMetadata.offlineReplicas()));
                 }
 
                 topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
@@ -427,18 +425,6 @@ public class MetadataResponse extends AbstractResponse {
             return topicMetadataList;
         }
 
-        private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
-            List<Node> nodes = new ArrayList<>(brokerIds.size());
-            for (Integer brokerId : brokerIds) {
-                Node node = brokers.get(brokerId);
-                if (node == null)
-                    nodes.add(new Node(brokerId, "", -1));
-                else
-                    nodes.add(node);
-            }
-            return nodes;
-        }
-
     }
 
     public static MetadataResponse prepareResponse(int throttleTimeMs, Collection<Node> brokers, String clusterId,
@@ -469,12 +455,12 @@ public class MetadataResponse extends AbstractResponse {
             for (PartitionMetadata partitionMetadata : topicMetadata.partitionMetadata) {
                 metadataResponseTopic.partitions().add(new MetadataResponsePartition()
                     .setErrorCode(partitionMetadata.error.code())
-                    .setPartitionIndex(partitionMetadata.partition)
-                    .setLeaderId(partitionMetadata.leader == null ? -1 : partitionMetadata.leader.id())
-                    .setLeaderEpoch(partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
-                    .setReplicaNodes(partitionMetadata.replicas.stream().map(Node::id).collect(Collectors.toList()))
-                    .setIsrNodes(partitionMetadata.isr.stream().map(Node::id).collect(Collectors.toList()))
-                    .setOfflineReplicas(partitionMetadata.offlineReplicas.stream().map(Node::id).collect(Collectors.toList())));
+                    .setPartitionIndex(partitionMetadata.partition())
+                    .setLeaderId(partitionMetadata.leaderId.orElse(NO_LEADER_ID))
+                    .setLeaderEpoch(partitionMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+                    .setReplicaNodes(partitionMetadata.replicaIds)
+                    .setIsrNodes(partitionMetadata.inSyncReplicaIds)
+                    .setOfflineReplicas(partitionMetadata.offlineReplicaIds));
             }
             responseData.topics().add(metadataResponseTopic);
         });
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index 2b1f04c..be72edb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
 
 import java.nio.ByteBuffer;
 import java.util.Optional;
@@ -33,15 +33,12 @@ public final class RequestUtils {
 
     static Optional<Integer> getLeaderEpoch(Struct struct, Field.Int32 leaderEpochField) {
         int leaderEpoch = struct.getOrElse(leaderEpochField, RecordBatch.NO_PARTITION_LEADER_EPOCH);
-        Optional<Integer> leaderEpochOpt = leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
-                Optional.empty() : Optional.of(leaderEpoch);
-        return leaderEpochOpt;
+        return getLeaderEpoch(leaderEpoch);
     }
 
     static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
-        Optional<Integer> leaderEpochOpt = leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
+        return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
             Optional.empty() : Optional.of(leaderEpoch);
-        return leaderEpochOpt;
     }
 
     public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
new file mode 100644
index 0000000..afdf89c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class MetadataCacheTest {
+
+    @Test
+    public void testMissingLeaderEndpoint() {
+        // Although the broker attempts to ensure leader information is available, the
+        // client metadata cache may retain partition metadata across multiple responses.
+        // For example, separate responses may contain conflicting leader epochs for
+        // separate partitions and the client will always retain the highest.
+
+        TopicPartition topicPartition = new TopicPartition("topic", 0);
+
+        MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
+                Errors.NONE,
+                topicPartition,
+                Optional.of(5),
+                Optional.of(10),
+                Arrays.asList(5, 6, 7),
+                Arrays.asList(5, 6, 7),
+                Collections.emptyList());
+
+        Map<Integer, Node> nodesById = new HashMap<>();
+        nodesById.put(6, new Node(6, "localhost", 2077));
+        nodesById.put(7, new Node(7, "localhost", 2078));
+        nodesById.put(8, new Node(8, "localhost", 2079));
+
+        MetadataCache cache = new MetadataCache("clusterId",
+                nodesById,
+                Collections.singleton(partitionMetadata),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                null);
+
+        Cluster cluster = cache.cluster();
+        assertNull(cluster.leaderFor(topicPartition));
+
+        PartitionInfo partitionInfo = cluster.partition(topicPartition);
+        Map<Integer, Node> replicas = Arrays.stream(partitionInfo.replicas())
+                .collect(Collectors.toMap(Node::id, Function.identity()));
+        assertNull(partitionInfo.leader());
+        assertEquals(3, replicas.size());
+        assertTrue(replicas.get(5).isEmpty());
+        assertEquals(nodesById.get(6), replicas.get(6));
+        assertEquals(nodesById.get(7), replicas.get(7));
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 7067e88..3dc4c9a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -46,7 +46,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 import static org.apache.kafka.test.TestUtils.assertOptional;
 import static org.junit.Assert.assertEquals;
@@ -196,9 +195,9 @@ public class MetadataTest {
             MetadataResponse response = new MetadataResponse(struct, version);
             assertFalse(response.hasReliableLeaderEpochs());
             metadata.update(response, 100);
-            assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-            MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
-            assertEquals(-1, info.epoch());
+            assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+            MetadataResponse.PartitionMetadata metadata = this.metadata.partitionMetadataIfCurrent(tp).get();
+            assertEquals(Optional.empty(), metadata.leaderEpoch);
         }
 
         for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++) {
@@ -206,9 +205,9 @@ public class MetadataTest {
             MetadataResponse response = new MetadataResponse(struct, version);
             assertTrue(response.hasReliableLeaderEpochs());
             metadata.update(response, 100);
-            assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-            MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
-            assertEquals(10, info.epoch());
+            assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+            MetadataResponse.PartitionMetadata info = metadata.partitionMetadataIfCurrent(tp).get();
+            assertEquals(Optional.of(10), info.leaderEpoch);
         }
     }
 
@@ -255,13 +254,11 @@ public class MetadataTest {
         metadata.update(new MetadataResponse(data), 101);
         assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
 
-        assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-        MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
+        assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+        MetadataResponse.PartitionMetadata metadata = this.metadata.partitionMetadataIfCurrent(tp).get();
 
-        List<Integer> cachedIsr = Arrays.stream(info.partitionInfo().inSyncReplicas())
-                .map(Node::id).collect(Collectors.toList());
-        assertEquals(Arrays.asList(1, 2, 3), cachedIsr);
-        assertEquals(10, info.epoch());
+        assertEquals(Arrays.asList(1, 2, 3), metadata.inSyncReplicaIds);
+        assertEquals(Optional.of(10), metadata.leaderEpoch);
     }
 
     @Test
@@ -419,14 +416,14 @@ public class MetadataTest {
         // Cache of partition stays, but current partition info is not available since it's stale
         assertNotNull(metadata.fetch().partition(tp));
         assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
-        assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
+        assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
 
         // Metadata with older epoch is rejected, metadata state is unchanged
         metadata.update(metadataResponse, 20L);
         assertNotNull(metadata.fetch().partition(tp));
         assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
-        assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
+        assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
 
         // Metadata with equal or newer epoch is accepted
@@ -434,7 +431,7 @@ public class MetadataTest {
         metadata.update(metadataResponse, 30L);
         assertNotNull(metadata.fetch().partition(tp));
         assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
-        assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
+        assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
     }
 
@@ -450,9 +447,9 @@ public class MetadataTest {
         assertFalse(metadata.lastSeenLeaderEpoch(tp).isPresent());
 
         // still works
-        assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-        assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().partition(), 0);
-        assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().leader().id(), 0);
+        assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+        assertEquals(0, metadata.partitionMetadataIfCurrent(tp).get().partition());
+        assertEquals(Optional.of(0), metadata.partitionMetadataIfCurrent(tp).get().leaderId);
     }
 
     @Test
@@ -611,8 +608,9 @@ public class MetadataTest {
 
         MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 2, Collections.emptyMap(), partitionCounts, _tp -> 99,
             (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
-                new MetadataResponse.PartitionMetadata(error, partition, node0, leaderEpoch,
-                    Collections.singletonList(node0), Collections.emptyList(), Collections.singletonList(node1)));
+                new MetadataResponse.PartitionMetadata(error, partition, Optional.of(node0.id()), leaderEpoch,
+                    Collections.singletonList(node0.id()), Collections.emptyList(),
+                        Collections.singletonList(node1.id())));
         metadata.update(emptyMetadataResponse(), 0L);
         metadata.update(metadataResponse, 10L);
 
@@ -623,4 +621,79 @@ public class MetadataTest {
         assertEquals(metadata.fetch().nodeById(0).id(), 0);
         assertEquals(metadata.fetch().nodeById(1).id(), 1);
     }
+
+    @Test
+    public void testLeaderMetadataInconsistentWithBrokerMetadata() {
+        // Tests a reordering scenario which can lead to inconsistent leader state.
+        // A partition initially has one broker offline. That broker comes online and
+        // is elected leader. The client sees these two events in the opposite order.
+
+        TopicPartition tp = new TopicPartition("topic", 0);
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node2 = new Node(2, "localhost", 9094);
+
+        // The first metadata received by broker (epoch=10)
+        MetadataResponsePartition firstPartitionMetadata = new MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setErrorCode(Errors.NONE.code())
+                .setLeaderEpoch(10)
+                .setLeaderId(0)
+                .setReplicaNodes(Arrays.asList(0, 1, 2))
+                .setIsrNodes(Arrays.asList(0, 1, 2))
+                .setOfflineReplicas(Collections.emptyList());
+
+        // The second metadata received has stale metadata (epoch=8)
+        MetadataResponsePartition secondPartitionMetadata = new MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setErrorCode(Errors.NONE.code())
+                .setLeaderEpoch(8)
+                .setLeaderId(1)
+                .setReplicaNodes(Arrays.asList(0, 1, 2))
+                .setIsrNodes(Arrays.asList(1, 2))
+                .setOfflineReplicas(Collections.singletonList(0));
+
+        metadata.update(new MetadataResponse(new MetadataResponseData()
+                        .setTopics(buildTopicCollection(tp.topic(), firstPartitionMetadata))
+                        .setBrokers(buildBrokerCollection(Arrays.asList(node0, node1, node2)))),
+                10L);
+
+        metadata.update(new MetadataResponse(new MetadataResponseData()
+                        .setTopics(buildTopicCollection(tp.topic(), secondPartitionMetadata))
+                        .setBrokers(buildBrokerCollection(Arrays.asList(node1, node2)))),
+                20L);
+
+        assertNull(metadata.fetch().leaderFor(tp));
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+        assertFalse(metadata.currentLeader(tp).leader.isPresent());
+    }
+
+    private MetadataResponseTopicCollection buildTopicCollection(String topic, MetadataResponsePartition partitionMetadata) {
+        MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setIsInternal(false);
+
+        topicMetadata.setPartitions(Collections.singletonList(partitionMetadata));
+
+        MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
+        topics.add(topicMetadata);
+        return topics;
+    }
+
+    private MetadataResponseBrokerCollection buildBrokerCollection(List<Node> nodes) {
+        MetadataResponseBrokerCollection brokers = new MetadataResponseBrokerCollection();
+        for (Node node : nodes) {
+            MetadataResponseData.MetadataResponseBroker broker = new MetadataResponseData.MetadataResponseBroker()
+                    .setNodeId(node.id())
+                    .setHost(node.host())
+                    .setPort(node.port())
+                    .setRack(node.rack());
+            brokers.add(broker);
+        }
+        return brokers;
+    }
+
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index c869aaa..cba9b48 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -347,12 +347,12 @@ public class KafkaAdminClientTest {
             List<PartitionMetadata> pms = new ArrayList<>();
             for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
                 PartitionMetadata pm = new PartitionMetadata(error,
-                        pInfo.partition(),
-                        pInfo.leader(),
+                        new TopicPartition(topic, pInfo.partition()),
+                        Optional.of(pInfo.leader().id()),
                         Optional.of(234),
-                        Arrays.asList(pInfo.replicas()),
-                        Arrays.asList(pInfo.inSyncReplicas()),
-                        Arrays.asList(pInfo.offlineReplicas()));
+                        Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList()),
+                        Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList()),
+                        Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
                 pms.add(pm);
             }
             TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
@@ -614,8 +614,8 @@ public class KafkaAdminClientTest {
             // Then we respond to the DescribeTopic request
             Node leader = initializedCluster.nodes().get(0);
             MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
-                    Errors.NONE, 0, leader, Optional.of(10), singletonList(leader),
-                    singletonList(leader), singletonList(leader));
+                    Errors.NONE, new TopicPartition(topic, 0), Optional.of(leader.id()), Optional.of(10),
+                    singletonList(leader.id()), singletonList(leader.id()), singletonList(leader.id()));
             env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(),
                     initializedCluster.clusterResource().clusterId(), 1,
                     singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
@@ -960,11 +960,12 @@ public class KafkaAdminClientTest {
             List<Node> nodes = env.cluster().nodes();
 
             List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>();
-            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0.partition(), nodes.get(0),
-                    Optional.of(5), singletonList(nodes.get(0)), singletonList(nodes.get(0)),
-                    Collections.emptyList()));
-            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp1.partition(), nodes.get(1),
-                    Optional.of(5), singletonList(nodes.get(1)), singletonList(nodes.get(1)), Collections.emptyList()));
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0,
+                    Optional.of(nodes.get(0).id()), Optional.of(5), singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), Collections.emptyList()));
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp1,
+                    Optional.of(nodes.get(1).id()), Optional.of(5), singletonList(nodes.get(1).id()),
+                    singletonList(nodes.get(1).id()), Collections.emptyList()));
 
             List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
             topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false, partitionMetadata));
@@ -1024,17 +1025,21 @@ public class KafkaAdminClientTest {
 
             List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
             List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 2, null,
-                    Optional.empty(), singletonList(nodes.get(0)), singletonList(nodes.get(0)),
-                    Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 3, nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 4, nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition0,
+                    Optional.of(nodes.get(0).id()), Optional.of(5), singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition1,
+                    Optional.of(nodes.get(0).id()), Optional.of(5), singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, myTopicPartition2,
+                    Optional.empty(), Optional.empty(), singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition3,
+                    Optional.of(nodes.get(0).id()), Optional.of(5), singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition4,
+                    Optional.of(nodes.get(0).id()), Optional.of(5), singletonList(nodes.get(0).id()),
+                    singletonList(nodes.get(0).id()), Collections.emptyList()));
 
             t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic", false, p));
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b0b6fb8..e6483f5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1391,8 +1391,9 @@ public class ConsumerCoordinatorTest {
             subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
             Node node = new Node(0, "localhost", 9999);
             MetadataResponse.PartitionMetadata partitionMetadata =
-                new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node, Optional.empty(),
-                    singletonList(node), singletonList(node), singletonList(node));
+                new MetadataResponse.PartitionMetadata(Errors.NONE, new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
+                        Optional.of(node.id()), Optional.empty(), singletonList(node.id()), singletonList(node.id()),
+                        singletonList(node.id()));
             MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
                 Topic.GROUP_METADATA_TOPIC_NAME, true, singletonList(partitionMetadata));
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index b373192..b456e6c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -160,7 +160,8 @@ public class ConsumerMetadataTest {
 
     private MetadataResponse.TopicMetadata topicMetadata(String topic, boolean isInternal) {
         MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
-                0, node, Optional.of(5), singletonList(node), singletonList(node), singletonList(node));
+                new TopicPartition(topic, 0), Optional.of(node.id()), Optional.of(5),
+                singletonList(node.id()), singletonList(node.id()), singletonList(node.id()));
         return new MetadataResponse.TopicMetadata(Errors.NONE, topic, isInternal, singletonList(partitionMetadata));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 2e45935..843e1bf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1946,14 +1946,14 @@ public class FetcherTest {
             List<MetadataResponse.PartitionMetadata> altPartitions = new ArrayList<>();
             for (MetadataResponse.PartitionMetadata p : partitions) {
                 altPartitions.add(new MetadataResponse.PartitionMetadata(
-                    p.error(),
-                    p.partition(),
-                    null, //no leader
+                    p.error,
+                    p.topicPartition,
+                    Optional.empty(), //no leader
                     Optional.empty(),
-                    p.replicas(),
-                    p.isr(),
-                    p.offlineReplicas())
-                );
+                    p.replicaIds,
+                    p.inSyncReplicaIds,
+                    p.offlineReplicaIds
+                ));
             }
             MetadataResponse.TopicMetadata alteredTopic = new MetadataResponse.TopicMetadata(
                 item.error(),
@@ -3023,8 +3023,8 @@ public class FetcherTest {
 
         List<ConsumerRecord<byte[], byte[]>> records;
         assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
-        subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.leaderAndEpoch(tp0)));
-        subscriptions.seekValidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1)));
+        subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.currentLeader(tp0)));
+        subscriptions.seekValidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1)));
 
         // Fetch some records and establish an incremental fetch session.
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions1 = new LinkedHashMap<>();
@@ -3503,7 +3503,7 @@ public class FetcherTest {
 
         // Seek with a position and leader+epoch
         Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
-                metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+                metadata.currentLeader(tp0).leader, Optional.of(epochOne));
         subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch));
         assertFalse(client.isConnected(node.idString()));
         assertTrue(subscriptions.awaitingValidation(tp0));
@@ -3552,7 +3552,7 @@ public class FetcherTest {
 
         // Seek with a position and leader+epoch
         Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
-                metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+                metadata.currentLeader(tp0).leader, Optional.of(epochOne));
         subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
 
         // Update metadata to epoch=2, enter validation
@@ -3580,7 +3580,7 @@ public class FetcherTest {
         Node node = metadata.fetch().nodes().get(0);
         apiVersions.update(node.idString(), NodeApiVersions.create());
 
-        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.currentLeader(tp0).leader, Optional.of(epochOne));
         subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
 
         fetcher.validateOffsetsIfNeeded();
@@ -3623,7 +3623,7 @@ public class FetcherTest {
         apiVersions.update(node.idString(), NodeApiVersions.create());
 
         // Seek with a position and leader+epoch
-        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.currentLeader(tp0).leader, Optional.of(epochOne));
         subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
 
         // Update metadata to epoch=2, enter validation
@@ -3693,7 +3693,7 @@ public class FetcherTest {
         apiVersions.update(node.idString(), NodeApiVersions.create());
 
         // Seek
-        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(1));
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.currentLeader(tp0).leader, Optional.of(1));
         subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1), leaderAndEpoch));
 
         // Check for truncation, this should cause tp0 to go into validation
@@ -3817,19 +3817,22 @@ public class FetcherTest {
         client.prepareResponse(fullFetchResponse(tp0, buildRecords(1L, 1, 1), Errors.NONE, 100L, 0));
         consumerClient.poll(time.timer(0));
         fetchedRecords();
-        Node node = fetcher.selectReadReplica(tp0, subscriptions.position(tp0).currentLeader.leader, time.milliseconds());
+
+        Metadata.LeaderAndEpoch leaderAndEpoch = subscriptions.position(tp0).currentLeader;
+        assertTrue(leaderAndEpoch.leader.isPresent());
+        Node readReplica = fetcher.selectReadReplica(tp0, leaderAndEpoch.leader.get(), time.milliseconds());
 
         AtomicBoolean wokenUp = new AtomicBoolean(false);
         client.setWakeupHook(() -> {
             if (!wokenUp.getAndSet(true)) {
-                consumerClient.disconnectAsync(node);
+                consumerClient.disconnectAsync(readReplica);
                 consumerClient.poll(time.timer(0));
             }
         });
 
         assertEquals(1, fetcher.sendFetches());
 
-        consumerClient.disconnectAsync(node);
+        consumerClient.disconnectAsync(readReplica);
         consumerClient.poll(time.timer(0));
 
         assertEquals(1, fetcher.sendFetches());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
index 55b8754..b3339ac 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
@@ -69,7 +69,7 @@ public class OffsetForLeaderEpochClientTest {
     public void testUnexpectedEmptyResponse() {
         Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new HashMap<>();
         positionMap.put(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1),
-                new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+                new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
         RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
@@ -88,7 +88,7 @@ public class OffsetForLeaderEpochClientTest {
     public void testOkResponse() {
         Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new HashMap<>();
         positionMap.put(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1),
-                new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+                new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
         RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
@@ -111,7 +111,7 @@ public class OffsetForLeaderEpochClientTest {
     public void testUnauthorizedTopic() {
         Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new HashMap<>();
         positionMap.put(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1),
-                new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+                new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
         RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
@@ -131,7 +131,7 @@ public class OffsetForLeaderEpochClientTest {
     public void testRetriableError() {
         Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new HashMap<>();
         positionMap.put(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1),
-                new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+                new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1))));
 
         OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
         RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 6e1e8a3..74048ad 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -51,7 +51,7 @@ public class SubscriptionStateTest {
     private final TopicPartition tp1 = new TopicPartition(topic, 1);
     private final TopicPartition t1p0 = new TopicPartition(topic1, 0);
     private final MockRebalanceListener rebalanceListener = new MockRebalanceListener();
-    private final Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Node.noNode(), Optional.empty());
+    private final Metadata.LeaderAndEpoch leaderAndEpoch = Metadata.LeaderAndEpoch.noLeaderOrEpoch();
 
     @Test
     public void partitionAssignment() {
@@ -363,15 +363,17 @@ public class SubscriptionStateTest {
 
         // Seek with no offset epoch requires no validation no matter what the current leader is
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(5))));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
 
-        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(broker1, Optional.empty())));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
 
-        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(10))));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
     }
@@ -383,12 +385,12 @@ public class SubscriptionStateTest {
 
         // Seek with no offset epoch requires no validation no matter what the current leader is
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(5))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(5))));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
     }
@@ -399,22 +401,25 @@ public class SubscriptionStateTest {
         state.assignFromUser(Collections.singleton(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(5))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
 
         // Update using the current leader and epoch
-        assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+        assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(5))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
 
         // Update with a newer leader and epoch
-        assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(broker1, Optional.of(15))));
+        assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(15))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
 
         // If the updated leader has no epoch information, then skip validation and begin fetching
-        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(broker1, Optional.empty())));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
     }
@@ -425,13 +430,13 @@ public class SubscriptionStateTest {
         state.assignFromUser(Collections.singleton(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
         assertEquals(10L, state.position(tp0).offset);
 
         state.seekValidated(tp0, new SubscriptionState.FetchPosition(8L, Optional.of(4),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
         assertTrue(state.hasValidPosition(tp0));
         assertFalse(state.awaitingValidation(tp0));
         assertEquals(8L, state.position(tp0).offset);
@@ -443,7 +448,7 @@ public class SubscriptionStateTest {
         state.assignFromUser(Collections.singleton(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
         assertFalse(state.hasValidPosition(tp0));
         assertTrue(state.awaitingValidation(tp0));
         assertEquals(10L, state.position(tp0).offset);
@@ -460,7 +465,7 @@ public class SubscriptionStateTest {
         state.assignFromUser(Collections.singleton(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
-                new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+                new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
         assertTrue(state.awaitingValidation(tp0));
 
         state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
@@ -478,7 +483,7 @@ public class SubscriptionStateTest {
         int initialOffsetEpoch = 5;
 
         SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
@@ -501,12 +506,12 @@ public class SubscriptionStateTest {
         int updateOffsetEpoch = 8;
 
         SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
         SubscriptionState.FetchPosition updatePosition = new SubscriptionState.FetchPosition(updateOffset,
-                Optional.of(updateOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(updateOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, updatePosition);
 
         Optional<OffsetAndMetadata> divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition,
@@ -526,7 +531,7 @@ public class SubscriptionStateTest {
         int initialOffsetEpoch = 5;
 
         SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
@@ -551,7 +556,7 @@ public class SubscriptionStateTest {
         int divergentOffsetEpoch = 7;
 
         SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
@@ -561,7 +566,7 @@ public class SubscriptionStateTest {
         assertFalse(state.awaitingValidation(tp0));
 
         SubscriptionState.FetchPosition updatedPosition = new SubscriptionState.FetchPosition(divergentOffset,
-                Optional.of(divergentOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(divergentOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         assertEquals(updatedPosition, state.position(tp0));
     }
 
@@ -578,7 +583,7 @@ public class SubscriptionStateTest {
         int divergentOffsetEpoch = 7;
 
         SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset,
-                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+                Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, initialPosition);
         assertTrue(state.awaitingValidation(tp0));
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8330e6a..487eabf 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -169,6 +169,8 @@ import java.util.Optional;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.protocol.ApiKeys.FETCH;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.apache.kafka.test.TestUtils.toBuffer;
@@ -637,7 +639,7 @@ public class RequestResponseTest {
         responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData<>(Errors.NONE, 900000,
                 5, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records));
         responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData<>(Errors.NONE, 70000,
-                6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), Collections.emptyList(), records));
+                6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), emptyList(), records));
 
         FetchResponse<MemoryRecords> response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
         FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -1186,19 +1188,21 @@ public class RequestResponseTest {
 
     private MetadataResponse createMetadataResponse() {
         Node node = new Node(1, "host1", 1001);
-        List<Node> replicas = asList(node);
-        List<Node> isr = asList(node);
-        List<Node> offlineReplicas = asList();
+        List<Integer> replicas = singletonList(node.id());
+        List<Integer> isr = singletonList(node.id());
+        List<Integer> offlineReplicas = emptyList();
 
         List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>();
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true,
-                asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node,
-                        Optional.of(5), replicas, isr, offlineReplicas))));
+                asList(new MetadataResponse.PartitionMetadata(Errors.NONE,
+                        new TopicPartition("__consumer_offsets", 1),
+                        Optional.of(node.id()), Optional.of(5), replicas, isr, offlineReplicas))));
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
-                Collections.emptyList()));
+                emptyList()));
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic3", false,
-            asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null,
-                Optional.empty(), replicas, isr, offlineReplicas))));
+            asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE,
+                    new TopicPartition("topic3", 0), Optional.empty(),
+                    Optional.empty(), replicas, isr, offlineReplicas))));
 
         return MetadataResponse.prepareResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
     }
@@ -1808,13 +1812,13 @@ public class RequestResponseTest {
         Map<ConfigResource, Collection<String>> resources = new HashMap<>();
         resources.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), asList("foo", "bar"));
         resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), null);
-        resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic a"), Collections.emptyList());
+        resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic a"), emptyList());
         return new DescribeConfigsRequest.Builder(resources).build((short) version);
     }
 
     private DescribeConfigsResponse createDescribeConfigsResponse() {
         Map<ConfigResource, DescribeConfigsResponse.Config> configs = new HashMap<>();
-        List<DescribeConfigsResponse.ConfigSynonym> synonyms = Collections.emptyList();
+        List<DescribeConfigsResponse.ConfigSynonym> synonyms = emptyList();
         List<DescribeConfigsResponse.ConfigEntry> configEntries = asList(
                 new DescribeConfigsResponse.ConfigEntry("config_name", "config_value",
                         DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false, synonyms),
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 5782b6d..ad2ad99 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -159,9 +159,10 @@ public class TestUtils {
             for (int i = 0; i < numPartitions; i++) {
                 TopicPartition tp = new TopicPartition(topic, i);
                 Node leader = nodes.get(i % nodes.size());
-                List<Node> replicas = Collections.singletonList(leader);
+                List<Integer> replicaIds = Collections.singletonList(leader.id());
                 partitionMetadata.add(partitionSupplier.supply(
-                        Errors.NONE, i, leader, Optional.ofNullable(epochSupplier.apply(tp)), replicas, replicas, replicas));
+                        Errors.NONE, tp, Optional.of(leader.id()), Optional.ofNullable(epochSupplier.apply(tp)),
+                        replicaIds, replicaIds, replicaIds));
             }
 
             topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic,
@@ -180,12 +181,12 @@ public class TestUtils {
     @FunctionalInterface
     public interface PartitionMetadataSupplier {
         MetadataResponse.PartitionMetadata supply(Errors error,
-                              int partition,
-                              Node leader,
+                              TopicPartition partition,
+                              Optional<Integer> leaderId,
                               Optional<Integer> leaderEpoch,
-                              List<Node> replicas,
-                              List<Node> isr,
-                              List<Node> offlineReplicas);
+                              List<Integer> replicas,
+                              List<Integer> isr,
+                              List<Integer> offlineReplicas);
     }
 
     public static Cluster clusterWith(final int nodes, final String topic, final int partitions) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e3adb04..8a14cd7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1290,11 +1290,13 @@ class KafkaApis(val requestChannel: RequestChannel,
         } else {
           val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
             .find(_.partition == partition)
-            .map(_.leader)
-            .flatMap(p => Option(p))
+            .filter(_.leaderId.isPresent)
+            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId.get))
+            .flatMap(_.getNode(request.context.listenerName))
+            .filterNot(_.isEmpty)
 
           coordinatorEndpoint match {
-            case Some(endpoint) if !endpoint.isEmpty =>
+            case Some(endpoint) =>
               createFindCoordinatorResponse(Errors.NONE, endpoint)
             case _ =>
               createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 1ad0e41..10c25b5 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.util
 import java.util.{Collections, Optional}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
@@ -55,18 +56,22 @@ class MetadataCache(brokerId: Int) extends Logging {
 
   // This method is the main hotspot when it comes to the performance of metadata requests,
   // we should be careful about adding additional logic here. Relatedly, `brokers` is
-  // `Iterable[Integer]` instead of `Iterable[Int]` to avoid a collection copy.
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
   // filterUnavailableEndpoints exists to support v0 MetadataResponses
-  private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
-    val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))
-    brokers.foreach { brokerId =>
-      val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match {
-        case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId, "", -1)) else None
-        case Some(node) => Some(node)
+  private def maybeFilterAliveReplicas(snapshot: MetadataSnapshot,
+                                       brokers: java.util.List[Integer],
+                                       listenerName: ListenerName,
+                                       filterUnavailableEndpoints: Boolean): java.util.List[Integer] = {
+    if (!filterUnavailableEndpoints) {
+      brokers
+    } else {
+      val res = new util.ArrayList[Integer](math.min(snapshot.aliveBrokers.size, brokers.size))
+      for (brokerId <- brokers.asScala) {
+        if (hasAliveEndpoint(snapshot, brokerId, listenerName))
+          res.add(brokerId)
       }
-      endpoint.foreach(result +=)
+      res
     }
-    result
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
@@ -80,53 +85,71 @@ class MetadataCache(brokerId: Int) extends Logging {
         val leaderBrokerId = partitionState.leader
         val leaderEpoch = partitionState.leaderEpoch
         val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, listenerName)
-        val replicas = partitionState.replicas.asScala
-        val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints)
-        val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints)
 
-        val isr = partitionState.isr.asScala
-        val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints)
+        val replicas = partitionState.replicas
+        val filteredReplicas = maybeFilterAliveReplicas(snapshot, replicas, listenerName, errorUnavailableEndpoints)
+
+        val isr = partitionState.isr
+        val filteredIsr = maybeFilterAliveReplicas(snapshot, isr, listenerName, errorUnavailableEndpoints)
+
+        val offlineReplicas = partitionState.offlineReplicas
+
         maybeLeader match {
           case None =>
             val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we are already holding the read lock
               debug(s"Error while fetching metadata for $topicPartition: leader not available")
               Errors.LEADER_NOT_AVAILABLE
             } else {
-              debug(s"Error while fetching metadata for $topicPartition: listener $listenerName not found on leader $leaderBrokerId")
+              debug(s"Error while fetching metadata for $topicPartition: listener $listenerName " +
+                s"not found on leader $leaderBrokerId")
               if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
             }
-            new MetadataResponse.PartitionMetadata(error, partitionId.toInt, Node.noNode(),
-              Optional.empty(), replicaInfo.asJava, isrInfo.asJava,
-              offlineReplicaInfo.asJava)
+            new MetadataResponse.PartitionMetadata(error, topicPartition, Optional.empty(),
+              Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas)
 
           case Some(leader) =>
-            if (replicaInfo.size < replicas.size) {
+            if (filteredReplicas.size < replicas.size) {
               debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
-                s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
-
-              new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId.toInt, leader,
-                Optional.empty(), replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
-            } else if (isrInfo.size < isr.size) {
+                s"following brokers ${replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
+              new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition,
+                Optional.of(leader.id), Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas)
+            } else if (filteredIsr.size < isr.size) {
               debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " +
-                s"following brokers ${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}")
-              new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId.toInt, leader,
-                Optional.empty(), replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
+                s"following brokers ${isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
+              new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition,
+                Optional.of(leader.id), Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas)
             } else {
-              new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId.toInt, leader, Optional.of(leaderEpoch),
-                replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
+              new MetadataResponse.PartitionMetadata(Errors.NONE, topicPartition,
+                Optional.of(leader.id), Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas)
             }
         }
       }
     }
   }
 
-  private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, listenerName: ListenerName): Option[Node] =
-    // Returns None if broker is not alive or if the broker does not have a listener named `listenerName`.
-    // Since listeners can be added dynamically, a broker with a missing listener could be a transient error.
+  /**
+   * Check whether a broker is alive and has a registered listener matching the provided name.
+   * This method was added to avoid unnecessary allocations in [[maybeFilterAliveReplicas]], which is
+   * a hotspot in metadata handling.
+   */
+  private def hasAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, listenerName: ListenerName): Boolean = {
+    snapshot.aliveNodes.get(brokerId).exists(_.contains(listenerName))
+  }
+
+  /**
+   * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can
+   * be added dynamically, so a broker with a missing listener could be a transient error.
+   *
+   * @return None if broker is not alive or if the broker does not have a listener named `listenerName`.
+   */
+  private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, listenerName: ListenerName): Option[Node] = {
     snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName))
+  }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false,
+  def getTopicMetadata(topics: Set[String],
+                       listenerName: ListenerName,
+                       errorUnavailableEndpoints: Boolean = false,
                        errorUnavailableListeners: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
     val snapshot = metadataSnapshot
     topics.toSeq.flatMap { topic =>
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index efd721d..cdb4c0a 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.admin
 
+import java.util.Optional
+
 import kafka.controller.ReplicaAssignment
 import kafka.server.BaseRequestTest
 import kafka.utils.TestUtils
@@ -101,9 +103,14 @@ class AddPartitionsTest extends BaseRequestTest {
     assertEquals(partitions.size, 3)
     assertEquals(1, partitions(1).partition)
     assertEquals(2, partitions(2).partition)
-    val replicas = partitions(1).replicas
-    assertEquals(replicas.size, 2)
-    assertTrue(replicas.contains(partitions(1).leader))
+
+    for (partition <- partitions) {
+      val replicas = partition.replicaIds
+      assertEquals(2, replicas.size)
+      assertTrue(partition.leaderId.isPresent)
+      val leaderId = partition.leaderId.get
+      assertTrue(replicas.contains(leaderId))
+    }
   }
 
   @Test
@@ -131,10 +138,9 @@ class AddPartitionsTest extends BaseRequestTest {
     assertEquals(0, partitionMetadata(0).partition)
     assertEquals(1, partitionMetadata(1).partition)
     assertEquals(2, partitionMetadata(2).partition)
-    val replicas = partitionMetadata(1).replicas
+    val replicas = partitionMetadata(1).replicaIds
     assertEquals(2, replicas.size)
-    assertTrue(replicas.asScala.head.id == 0 || replicas.asScala.head.id == 1)
-    assertTrue(replicas.asScala(1).id == 0 || replicas.asScala(1).id == 1)
+    assertEquals(Set(0, 1), replicas.asScala.toSet)
   }
 
   @Test
@@ -185,9 +191,8 @@ class AddPartitionsTest extends BaseRequestTest {
     assertTrue(s"Partition $partitionId should exist", partitionOpt.isDefined)
     val partition = partitionOpt.get
 
-    assertNotNull("Partition leader should exist", partition.leader)
-    assertEquals("Partition leader id should match", expectedLeaderId, partition.leaderId)
-    assertEquals("Replica set should match", expectedReplicas, partition.replicas.asScala.map(_.id).toSet)
+    assertEquals("Partition leader id should match", Optional.of(expectedLeaderId), partition.leaderId)
+    assertEquals("Replica set should match", expectedReplicas, partition.replicaIds.asScala.toSet)
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index b110139..5281d80 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -584,7 +584,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
               testPartitionMetadata match {
                 case None => fail(s"Partition metadata is not found in metadata cache")
                 case Some(metadata) => {
-                  result && metadata.error() == Errors.LEADER_NOT_AVAILABLE
+                  result && metadata.error == Errors.LEADER_NOT_AVAILABLE
                 }
               }
             }
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 5f1f671..8ca2c70 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -131,9 +131,9 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
 
           if (replication == -1) {
             assertEquals("The topic should have the default replication factor",
-              configs.head.defaultReplicationFactor, metadataForTopic.partitionMetadata.asScala.head.replicas.size)
+              configs.head.defaultReplicationFactor, metadataForTopic.partitionMetadata.asScala.head.replicaIds.size)
           } else {
-            assertEquals("The topic should have the correct replication factor", replication, metadataForTopic.partitionMetadata.asScala.head.replicas.size)
+            assertEquals("The topic should have the correct replication factor", replication, metadataForTopic.partitionMetadata.asScala.head.replicaIds.size)
           }
         }
       }
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index a42a86a..60cff13 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -128,16 +128,12 @@ class MetadataCacheTest {
         partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, partitionId) =>
           assertEquals(Errors.NONE, partitionMetadata.error)
           assertEquals(partitionId, partitionMetadata.partition)
-          val leader = partitionMetadata.leader
           val partitionState = topicPartitionStates.find(_.partitionIndex == partitionId).getOrElse(
             Assertions.fail(s"Unable to find partition state for partition $partitionId"))
-          assertEquals(partitionState.leader, leader.id)
+          assertEquals(Optional.of(partitionState.leader), partitionMetadata.leaderId)
           assertEquals(Optional.of(partitionState.leaderEpoch), partitionMetadata.leaderEpoch)
-          assertEquals(partitionState.isr, partitionMetadata.isr.asScala.map(_.id).asJava)
-          assertEquals(partitionState.replicas, partitionMetadata.replicas.asScala.map(_.id).asJava)
-          val endpoint = endpoints(partitionMetadata.leader.id).find(_.listener == listenerName.value).get
-          assertEquals(endpoint.host, leader.host)
-          assertEquals(endpoint.port, leader.port)
+          assertEquals(partitionState.isr, partitionMetadata.inSyncReplicaIds)
+          assertEquals(partitionState.replicas, partitionMetadata.replicaIds)
         }
       }
 
@@ -267,9 +263,8 @@ class MetadataCacheTest {
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
     assertEquals(expectedError, partitionMetadata.error)
-    assertFalse(partitionMetadata.isr.isEmpty)
-    assertEquals(1, partitionMetadata.replicas.size)
-    assertEquals(0, partitionMetadata.replicas.get(0).id)
+    assertFalse(partitionMetadata.inSyncReplicaIds.isEmpty)
+    assertEquals(List(0), partitionMetadata.replicaIds.asScala)
   }
 
   @Test
@@ -326,8 +321,8 @@ class MetadataCacheTest {
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
     assertEquals(Errors.NONE, partitionMetadata.error)
-    assertEquals(Set(0, 1), partitionMetadata.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0, 1), partitionMetadata.replicaIds.asScala.toSet)
+    assertEquals(Set(0), partitionMetadata.inSyncReplicaIds.asScala.toSet)
 
     // Validate errorUnavailableEndpoints = true
     val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true)
@@ -342,8 +337,8 @@ class MetadataCacheTest {
     val partitionMetadataWithError = partitionMetadatasWithError.get(0)
     assertEquals(0, partitionMetadataWithError.partition)
     assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error)
-    assertEquals(Set(0), partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0), partitionMetadataWithError.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet)
+    assertEquals(Set(0), partitionMetadataWithError.inSyncReplicaIds.asScala.toSet)
   }
 
   @Test
@@ -400,8 +395,8 @@ class MetadataCacheTest {
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
     assertEquals(Errors.NONE, partitionMetadata.error)
-    assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0, 1), partitionMetadata.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadata.replicaIds.asScala.toSet)
+    assertEquals(Set(0, 1), partitionMetadata.inSyncReplicaIds.asScala.toSet)
 
     // Validate errorUnavailableEndpoints = true
     val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true)
@@ -416,8 +411,8 @@ class MetadataCacheTest {
     val partitionMetadataWithError = partitionMetadatasWithError.get(0)
     assertEquals(0, partitionMetadataWithError.partition)
     assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error)
-    assertEquals(Set(0), partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0), partitionMetadataWithError.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet)
+    assertEquals(Set(0), partitionMetadataWithError.inSyncReplicaIds.asScala.toSet)
   }
 
   @Test
@@ -455,7 +450,7 @@ class MetadataCacheTest {
     val topicMetadata = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
     assertEquals(1, topicMetadata.size)
     assertEquals(1, topicMetadata.head.partitionMetadata.size)
-    assertEquals(-1, topicMetadata.head.partitionMetadata.get(0).leaderId)
+    assertEquals(Optional.empty, topicMetadata.head.partitionMetadata.get(0).leaderId)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index aa6a53f..599ae17 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -17,11 +17,10 @@
 
 package kafka.server
 
-import java.util.Properties
+import java.util.{Optional, Properties}
 
 import kafka.network.SocketServer
 import kafka.utils.TestUtils
-import org.apache.kafka.common.Node
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.MetadataRequestData
@@ -202,8 +201,9 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals(1, topicMetadata1.partitionMetadata.size)
     val partitionMetadata = topicMetadata1.partitionMetadata.asScala.head
     assertEquals(0, partitionMetadata.partition)
-    assertEquals(2, partitionMetadata.replicas.size)
-    assertNotNull(partitionMetadata.leader)
+    assertEquals(2, partitionMetadata.replicaIds.size)
+    assertTrue(partitionMetadata.leaderId.isPresent)
+    assertTrue(partitionMetadata.leaderId.get >= 0)
   }
 
   @Test
@@ -243,9 +243,9 @@ class MetadataRequestTest extends BaseRequestTest {
       assertEquals(Set(0, 1), topicMetadata.partitionMetadata.asScala.map(_.partition).toSet)
       topicMetadata.partitionMetadata.asScala.foreach { partitionMetadata =>
         val assignment = replicaAssignment(partitionMetadata.partition)
-        assertEquals(assignment, partitionMetadata.replicas.asScala.map(_.id))
-        assertEquals(assignment, partitionMetadata.isr.asScala.map(_.id))
-        assertEquals(assignment.head, partitionMetadata.leader.id)
+        assertEquals(assignment, partitionMetadata.replicaIds.asScala)
+        assertEquals(assignment, partitionMetadata.inSyncReplicaIds.asScala)
+        assertEquals(Optional.of(assignment.head), partitionMetadata.leaderId)
       }
     }
   }
@@ -268,23 +268,22 @@ class MetadataRequestTest extends BaseRequestTest {
     createTopic(replicaDownTopic, 1, replicaCount)
 
     // Kill a replica node that is not the leader
-    val metadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
+    val metadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
     val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     val downNode = servers.find { server =>
       val serverId = server.dataPlaneRequestProcessor.brokerId
-      val leaderId = partitionMetadata.leader.id
-      val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
+      val leaderId = partitionMetadata.leaderId
+      val replicaIds = partitionMetadata.replicaIds.asScala
       serverId != leaderId && replicaIds.contains(serverId)
     }.get
     downNode.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      val response = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
-      val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
-      val replica = metadata.replicas.asScala.find(_.id == downNode.dataPlaneRequestProcessor.brokerId).get
-      replica.host == "" & replica.port == -1
+      val response = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
+      !response.brokers.asScala.exists(_.id == downNode.dataPlaneRequestProcessor.brokerId)
     }, "Replica was not found down", 5000)
 
+
     // Validate version 0 still filters unavailable replicas and contains error
     val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(requestData(List(replicaDownTopic), true), 0.toShort))
     val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
@@ -293,37 +292,35 @@ class MetadataRequestTest extends BaseRequestTest {
     assertTrue("Response should have one topic", v0MetadataResponse.topicMetadata.size == 1)
     val v0PartitionMetadata = v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     assertTrue("PartitionMetadata should have an error", v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE)
-    assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size == replicaCount - 1)
+    assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicaIds.size == replicaCount - 1)
 
     // Validate version 1 returns unavailable replicas with no error
-    val v1MetadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
+    val v1MetadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(1))
     val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
     assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty)
     assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode))
     assertEquals("Response should have one topic", 1, v1MetadataResponse.topicMetadata.size)
     val v1PartitionMetadata = v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     assertEquals("PartitionMetadata should have no errors", Errors.NONE, v1PartitionMetadata.error)
-    assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicas.size)
+    assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicaIds.size)
   }
 
   @Test
   def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
     def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
       val activeBrokers = servers.filter(_.brokerState.currentState != NotRunning.state)
-      val expectedIsr = activeBrokers.map { broker =>
-        new Node(broker.config.brokerId, "localhost", TestUtils.boundPort(broker), broker.config.rack.orNull)
-      }.sortBy(_.id)
+      val expectedIsr = activeBrokers.map(_.config.brokerId).toSet
 
       // Assert that topic metadata at new brokers is updated correctly
       activeBrokers.foreach { broker =>
-        var actualIsr: Seq[Node] = Seq.empty
+        var actualIsr = Set.empty[Int]
         TestUtils.waitUntilTrue(() => {
           val metadataResponse = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic).asJava, false).build,
             Some(brokerSocketServer(broker.config.brokerId)))
           val firstPartitionMetadata = metadataResponse.topicMetadata.asScala.headOption.flatMap(_.partitionMetadata.asScala.headOption)
           actualIsr = firstPartitionMetadata.map { partitionMetadata =>
-            partitionMetadata.isr.asScala.sortBy(_.id)
-          }.getOrElse(Seq.empty)
+            partitionMetadata.inSyncReplicaIds.asScala.map(Int.unbox).toSet
+          }.getOrElse(Set.empty)
           expectedIsr == actualIsr
         }, s"Topic metadata not updated correctly in broker $broker\n" +
           s"Expected ISR: $expectedIsr \n" +


Mime
View raw message