kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] 02/04: Rip out references to Node in PartitionMetadata
Date Fri, 10 Jan 2020 01:04:44 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch KAFKA-9261
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 21a27cfe9fd724a702f3d095a9893784dc59caa4
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Dec 3 00:55:12 2019 -0800

    Rip out references to Node in PartitionMetadata
---
 .../java/org/apache/kafka/clients/Metadata.java    |  47 +++---
 .../org/apache/kafka/clients/MetadataCache.java    |  68 ++++-----
 .../org/apache/kafka/clients/NetworkClient.java    |   2 +-
 .../admin/internals/MetadataOperationContext.java  |   4 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |   4 +-
 .../consumer/internals/ConsumerCoordinator.java    |   6 +-
 .../kafka/clients/consumer/internals/Fetcher.java  |  24 ++-
 .../kafka/common/requests/MetadataResponse.java    | 168 ++++++++-------------
 .../org/apache/kafka/clients/MetadataTest.java     |  19 +--
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  44 +++---
 .../internals/ConsumerCoordinatorTest.java         |   2 +-
 .../consumer/internals/ConsumerMetadataTest.java   |   3 +-
 .../clients/consumer/internals/FetcherTest.java    |  28 ++--
 .../kafka/common/requests/RequestResponseTest.java |  20 +--
 .../test/java/org/apache/kafka/test/TestUtils.java |  13 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   4 +-
 .../main/scala/kafka/server/MetadataCache.scala    |  67 ++++----
 .../scala/unit/kafka/admin/AddPartitionsTest.scala |  12 +-
 .../admin/TopicCommandWithAdminClientTest.scala    |   2 +-
 .../server/AbstractCreateTopicsRequestTest.scala   |   4 +-
 .../unit/kafka/server/MetadataCacheTest.scala      |  37 ++---
 .../unit/kafka/server/MetadataRequestTest.scala    |  41 +++--
 22 files changed, 291 insertions(+), 328 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 9589739..59351ee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -45,6 +45,8 @@ import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
+import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
+
 /**
  * A class encapsulating some of the logic around metadata.
  * <p>
@@ -197,16 +199,30 @@ public class Metadata implements Closeable {
     /**
      * Return the cached partition info if it exists and a newer leader epoch isn't known about.
      */
-    public synchronized Optional<MetadataResponse.PartitionMetadata> partitionMetadataIfCurrent(TopicPartition topicPartition) {
+    synchronized Optional<MetadataResponse.PartitionMetadata> partitionMetadataIfCurrent(TopicPartition topicPartition) {
         Integer epoch = lastSeenLeaderEpochs.get(topicPartition);
+        Optional<MetadataResponse.PartitionMetadata> partitionMetadata = cache.partitionMetadata(topicPartition);
         if (epoch == null) {
             // old cluster format (no epochs)
-            return cache.partitionMetadata(topicPartition);
+            return partitionMetadata;
         } else {
-            return cache.getPartitionInfoHavingEpoch(topicPartition, epoch);
+            return partitionMetadata.filter(metadata ->
+                    metadata.leaderEpoch.orElse(NO_PARTITION_LEADER_EPOCH).equals(epoch));
         }
     }
 
+    public synchronized Optional<LeaderAndEpoch> currentLeader(TopicPartition topicPartition) {
+        return partitionMetadataIfCurrent(topicPartition).flatMap(partitionMetadata -> {
+            Optional<Integer> leaderEpoch = partitionMetadata.leaderEpoch;
+            Optional<Node> leaderNodeOpt = cache.nodeById(partitionMetadata.leaderId);
+            return leaderNodeOpt.map(leaderNode -> new LeaderAndEpoch(leaderNode, leaderEpoch));
+        });
+    }
+
+    public synchronized LeaderAndEpoch currentLeaderOrEmpty(TopicPartition tp) {
+        return currentLeader(tp).orElse(new LeaderAndEpoch(Node.noNode(), lastSeenLeaderEpoch(tp)));
+    }
+
     public synchronized void bootstrap(List<InetSocketAddress> addresses) {
         this.needUpdate = true;
         this.updateVersion += 1;
@@ -243,7 +259,7 @@ public class Metadata implements Closeable {
         this.lastSuccessfulRefreshMs = now;
         this.updateVersion += 1;
 
-        String previousClusterId = cache.cluster().clusterResource().clusterId();
+        String previousClusterId = cache.clusterResource().clusterId();
 
         this.cache = handleMetadataResponse(response, topic -> retainTopic(topic.topic(), topic.isInternal(), now));
 
@@ -252,11 +268,11 @@ public class Metadata implements Closeable {
 
         this.lastSeenLeaderEpochs.keySet().removeIf(tp -> !retainTopic(tp.topic(), false, now));
 
-        String newClusterId = cache.cluster().clusterResource().clusterId();
+        String newClusterId = cache.clusterResource().clusterId();
         if (!Objects.equals(previousClusterId, newClusterId)) {
             log.info("Cluster ID: {}", newClusterId);
         }
-        clusterResourceListeners.onUpdate(cache.cluster().clusterResource());
+        clusterResourceListeners.onUpdate(cache.clusterResource());
 
         log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache);
     }
@@ -301,9 +317,9 @@ public class Metadata implements Closeable {
                     updatePartitionInfo(metadata.topic(), partitionMetadata,
                             metadataResponse.hasReliableLeaderEpochs(), partitions::add);
 
-                    if (partitionMetadata.error().exception() instanceof InvalidMetadataException) {
+                    if (partitionMetadata.error.exception() instanceof InvalidMetadataException) {
                         log.debug("Requesting metadata update for partition {} due to error {}",
-                                partitionMetadata.topicPartition, partitionMetadata.error());
+                                partitionMetadata.topicPartition, partitionMetadata.error);
                         requestUpdate();
                     }
                 }
@@ -314,7 +330,7 @@ public class Metadata implements Closeable {
         }
 
         return new MetadataCache(metadataResponse.clusterId(),
-                new ArrayList<>(metadataResponse.brokers()),
+                metadataResponse.brokersById(),
                 partitions,
                 metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
                 metadataResponse.topicsByError(Errors.INVALID_TOPIC_EXCEPTION),
@@ -331,8 +347,8 @@ public class Metadata implements Closeable {
                                      Consumer<MetadataResponse.PartitionMetadata> partitionInfoConsumer) {
         TopicPartition tp = new TopicPartition(topic, partitionMetadata.partition());
 
-        if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch().isPresent()) {
-            int newEpoch = partitionMetadata.leaderEpoch().get();
+        if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
+            int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the previous one, update the metadata
             if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch, false)) {
                 partitionInfoConsumer.accept(partitionMetadata);
@@ -489,15 +505,6 @@ public class Metadata implements Closeable {
         }
     }
 
-    public synchronized LeaderAndEpoch leaderAndEpoch(TopicPartition tp) {
-        return partitionMetadataIfCurrent(tp)
-                .map(partitionMetadata -> {
-                    Node leader = partitionMetadata.leader();
-                    return new LeaderAndEpoch(leader == null ? Node.noNode() : leader, partitionMetadata.leaderEpoch());
-                })
-                .orElse(new LeaderAndEpoch(Node.noNode(), lastSeenLeaderEpoch(tp)));
-    }
-
     public static class LeaderAndEpoch {
 
         public static final LeaderAndEpoch NO_LEADER_OR_EPOCH = new LeaderAndEpoch(Node.noNode(), Optional.empty());
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index af595fe..3230087 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -17,13 +17,13 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.requests.MetadataResponse;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,15 +33,13 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
-
 /**
  * An internal mutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster
  * instance which is optimized for read access.
  */
 public class MetadataCache {
     private final String clusterId;
-    private final List<Node> nodes;
+    private final Map<Integer, Node> nodes;
     private final Set<String> unauthorizedTopics;
     private final Set<String> invalidTopics;
     private final Set<String> internalTopics;
@@ -51,7 +49,7 @@ public class MetadataCache {
     private Cluster clusterInstance;
 
     MetadataCache(String clusterId,
-                  List<Node> nodes,
+                  Map<Integer, Node> nodes,
                   Collection<MetadataResponse.PartitionMetadata> partitions,
                   Set<String> unauthorizedTopics,
                   Set<String> invalidTopics,
@@ -60,14 +58,14 @@ public class MetadataCache {
         this(clusterId, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, null);
     }
 
-    MetadataCache(String clusterId,
-                  List<Node> nodes,
-                  Collection<MetadataResponse.PartitionMetadata> partitions,
-                  Set<String> unauthorizedTopics,
-                  Set<String> invalidTopics,
-                  Set<String> internalTopics,
-                  Node controller,
-                  Cluster clusterInstance) {
+    private MetadataCache(String clusterId,
+                          Map<Integer, Node> nodes,
+                          Collection<MetadataResponse.PartitionMetadata> partitions,
+                          Set<String> unauthorizedTopics,
+                          Set<String> invalidTopics,
+                          Set<String> internalTopics,
+                          Node controller,
+                          Cluster clusterInstance) {
         this.clusterId = clusterId;
         this.nodes = nodes;
         this.unauthorizedTopics = unauthorizedTopics;
@@ -87,19 +85,14 @@ public class MetadataCache {
         }
     }
 
-    /**
-     * Return the cached PartitionInfo iff it was for the given epoch
-     */
-    Optional<MetadataResponse.PartitionMetadata> getPartitionInfoHavingEpoch(TopicPartition topicPartition, int epoch) {
-        MetadataResponse.PartitionMetadata infoAndEpoch = metadataByPartition.get(topicPartition);
-        return Optional.ofNullable(infoAndEpoch)
-                .filter(infoEpoch -> infoEpoch.leaderEpoch().orElse(NO_PARTITION_LEADER_EPOCH) == epoch);
-    }
-
     Optional<MetadataResponse.PartitionMetadata> partitionMetadata(TopicPartition topicPartition) {
         return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
+    Optional<Node> nodeById(int id) {
+        return Optional.ofNullable(nodes.get(id));
+    }
+
     Cluster cluster() {
         if (clusterInstance == null) {
             throw new IllegalStateException("Cached Cluster instance should not be null, but was.");
@@ -108,36 +101,33 @@ public class MetadataCache {
         }
     }
 
+    ClusterResource clusterResource() {
+        return new ClusterResource(clusterId);
+    }
+
     private void computeClusterView() {
         List<PartitionInfo> partitionInfos = metadataByPartition.values()
                 .stream()
-                .map(MetadataCache::buildPartitionInfo)
+                .map(metadata -> MetadataResponse.toPartitionInfo(metadata, nodes))
                 .collect(Collectors.toList());
-        this.clusterInstance = new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics,
+        this.clusterInstance = new Cluster(clusterId, nodes.values(), partitionInfos, unauthorizedTopics,
                 invalidTopics, internalTopics, controller);
     }
 
-    static PartitionInfo buildPartitionInfo(MetadataResponse.PartitionMetadata metadata) {
-        return new PartitionInfo(
-                metadata.topic(),
-                metadata.partition(),
-                metadata.leader(),
-                metadata.replicas().toArray(new Node[0]),
-                metadata.isr().toArray(new Node[0]),
-                metadata.offlineReplicas().toArray(new Node[0]));
-    }
-
     static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
-        List<Node> nodes = new ArrayList<>();
+        Map<Integer, Node> nodes = new HashMap<>();
         int nodeId = -1;
-        for (InetSocketAddress address : addresses)
-            nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
+        for (InetSocketAddress address : addresses) {
+            nodes.put(nodeId, new Node(nodeId, address.getHostString(), address.getPort()));
+            nodeId--;
+        }
         return new MetadataCache(null, nodes, Collections.emptyList(),
-                Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Cluster.bootstrap(addresses));
+                Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
+                null, Cluster.bootstrap(addresses));
     }
 
     static MetadataCache empty() {
-        return new MetadataCache(null, Collections.emptyList(), Collections.emptyList(),
+        return new MetadataCache(null, Collections.emptyMap(), Collections.emptyList(),
                 Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Cluster.empty());
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3431b83..723c1b8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1049,7 +1049,7 @@ public class NetworkClient implements KafkaClient {
             // This could be a transient issue if listeners were added dynamically to brokers.
             List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
                 topicMetadata.partitionMetadata().stream()
-                    .filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND)
+                    .filter(partitionMetadata -> partitionMetadata.error == Errors.LISTENER_NOT_FOUND)
                     .map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
                 .collect(Collectors.toList());
             if (!missingListenerPartitions.isEmpty()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
index e6f4054..c05e5cf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -83,8 +83,8 @@ public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
             for (PartitionMetadata pm : tm.partitionMetadata()) {
-                if (shouldRefreshMetadata(pm.error())) {
-                    throw pm.error().exception();
+                if (shouldRefreshMetadata(pm.error)) {
+                    throw pm.error.exception();
                 }
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 883a048..f6d84a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1602,7 +1602,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
                     offset,
                     Optional.empty(), // This will ensure we skip validation
-                    this.metadata.leaderAndEpoch(partition));
+                    this.metadata.currentLeaderOrEmpty(partition));
             this.subscriptions.seekUnvalidated(partition, newPosition);
         } finally {
             release();
@@ -1633,7 +1633,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             } else {
                 log.info("Seeking to offset {} for partition {}", offset, partition);
             }
-            Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.leaderAndEpoch(partition);
+            Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.currentLeaderOrEmpty(partition);
             SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
                     offsetAndMetadata.offset(),
                     offsetAndMetadata.leaderEpoch(),
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 22bd656..3eb69c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -766,10 +766,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 // it's possible that the partition is no longer assigned when the response is received,
                 // so we need to ignore seeking if that's the case
                 if (this.subscriptions.isAssigned(tp)) {
-                    final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp);
+                    final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeaderOrEmpty(tp);
                     final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
-                        offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
-                        leaderAndEpoch);
+                            offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
+                            leaderAndEpoch);
 
                     this.subscriptions.seekUnvalidated(tp, position);
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ff9d738..c430797 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.ApiVersion;
 import org.apache.kafka.clients.StaleMetadataException;
@@ -484,7 +485,7 @@ public class Fetcher<K, V> implements Closeable {
 
         // Validate each partition against the current leader and epoch
         subscriptions.assignedPartitions().forEach(topicPartition -> {
-            ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(topicPartition);
+            ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeaderOrEmpty(topicPartition);
             subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, leaderAndEpoch);
         });
 
@@ -713,7 +714,7 @@ public class Fetcher<K, V> implements Closeable {
 
     private void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
         SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
-                offsetData.offset, offsetData.leaderEpoch, metadata.leaderAndEpoch(partition));
+                offsetData.offset, offsetData.leaderEpoch, metadata.currentLeaderOrEmpty(partition));
         offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch));
         subscriptions.maybeSeekUnvalidated(partition, position.offset, requestedResetStrategy);
     }
@@ -901,20 +902,16 @@ public class Fetcher<K, V> implements Closeable {
         for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
             TopicPartition tp  = entry.getKey();
             Long offset = entry.getValue();
-            Optional<MetadataResponse.PartitionMetadata> currentInfo = metadata.partitionMetadataIfCurrent(tp);
-            if (!currentInfo.isPresent()) {
+            Optional<Metadata.LeaderAndEpoch> leaderAndEpochOpt = metadata.currentLeader(tp);
+            if (!leaderAndEpochOpt.isPresent()) {
                 log.debug("Leader for partition {} is unknown for fetching offset {}", tp, offset);
                 metadata.requestUpdate();
                 partitionsToRetry.add(tp);
             } else {
-                MetadataResponse.PartitionMetadata partitionMetadata = currentInfo.get();
-                Node leader = partitionMetadata.leader();
+                Metadata.LeaderAndEpoch leaderAndEpoch = leaderAndEpochOpt.get();
+                Node leader = leaderAndEpoch.leader;
 
-                if (leader == null) {
-                    log.debug("Leader for partition {} is unavailable for fetching offset {}", tp, offset);
-                    metadata.requestUpdate();
-                    partitionsToRetry.add(tp);
-                } else if (client.isUnavailable(leader)) {
+                if (client.isUnavailable(leader)) {
                     client.maybeThrowAuthFailure(leader);
 
                     // The connection has failed and we need to await the blackout period before we can
@@ -924,8 +921,7 @@ public class Fetcher<K, V> implements Closeable {
                             leader, tp);
                     partitionsToRetry.add(tp);
                 } else {
-                    partitionDataMap.put(tp,
-                            new ListOffsetRequest.PartitionData(offset, partitionMetadata.leaderEpoch()));
+                    partitionDataMap.put(tp, new ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch));
                 }
             }
         }
@@ -1102,7 +1098,7 @@ public class Fetcher<K, V> implements Closeable {
 
         // Ensure the position has an up-to-date leader
         subscriptions.assignedPartitions().forEach(
-            tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp)));
+            tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.currentLeaderOrEmpty(tp)));
 
         long currentTimeMs = time.milliseconds();
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 16ec87d..8863e25 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -132,12 +133,13 @@ public class MetadataResponse extends AbstractResponse {
     public Cluster cluster() {
         Set<String> internalTopics = new HashSet<>();
         List<PartitionInfo> partitions = new ArrayList<>();
+
         for (TopicMetadata metadata : topicMetadata()) {
             if (metadata.error == Errors.NONE) {
                 if (metadata.isInternal)
                     internalTopics.add(metadata.topic);
                 for (PartitionMetadata partitionMetadata : metadata.partitionMetadata) {
-                    partitions.add(partitionMetaToInfo(metadata.topic, partitionMetadata));
+                    partitions.add(toPartitionInfo(partitionMetadata, holder().brokers));
                 }
             }
         }
@@ -145,6 +147,24 @@ public class MetadataResponse extends AbstractResponse {
                 topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, controller());
     }
 
+    public static PartitionInfo toPartitionInfo(PartitionMetadata metadata, Map<Integer, Node> nodesById) {
+        return new PartitionInfo(metadata.topic(),
+                metadata.partition(),
+                nodesById.get(metadata.leaderId),
+                convertToNodeArray(metadata.replicaIds, nodesById),
+                convertToNodeArray(metadata.inSyncReplicaIds, nodesById),
+                convertToNodeArray(metadata.offlineReplicaIds, nodesById));
+    }
+
+    private static Node[] convertToNodeArray(List<Integer> replicaIds, Map<Integer, Node> nodesById) {
+        return replicaIds.stream().map(replicaId -> {
+            Node node = nodesById.get(replicaId);
+            if (node == null)
+                return new Node(replicaId, "", -1);
+            return node;
+        }).toArray(Node[]::new);
+    }
+
     /**
      * Returns a 32-bit bitfield to represent authorized operations for this topic.
      */
@@ -163,19 +183,6 @@ public class MetadataResponse extends AbstractResponse {
         return data.clusterAuthorizedOperations();
     }
 
-    /**
-     * Transform a topic and PartitionMetadata into PartitionInfo.
-     */
-    public static PartitionInfo partitionMetaToInfo(String topic, PartitionMetadata partitionMetadata) {
-        return new PartitionInfo(
-                topic,
-                partitionMetadata.partition(),
-                partitionMetadata.leader(),
-                partitionMetadata.replicas().toArray(new Node[0]),
-                partitionMetadata.isr().toArray(new Node[0]),
-                partitionMetadata.offlineReplicas().toArray(new Node[0]));
-    }
-
     private Holder holder() {
         if (holder == null) {
             synchronized (data) {
@@ -191,6 +198,10 @@ public class MetadataResponse extends AbstractResponse {
      * @return the brokers
      */
     public Collection<Node> brokers() {
+        return holder().brokers.values();
+    }
+
+    public Map<Integer, Node> brokersById() {
         return holder().brokers;
     }
 
@@ -316,42 +327,27 @@ public class MetadataResponse extends AbstractResponse {
     // This is used to describe per-partition state in the MetadataResponse
     public static class PartitionMetadata {
         public final TopicPartition topicPartition;
-
-        private final Errors error;
-        private final Node leader;
-        private final Optional<Integer> leaderEpoch;
-        private final List<Node> replicas;
-        private final List<Node> isr;
-        private final List<Node> offlineReplicas;
+        public final Errors error;
+        public final int leaderId;
+        public final Optional<Integer> leaderEpoch;
+        public final List<Integer> replicaIds;
+        public final List<Integer> inSyncReplicaIds;
+        public final List<Integer> offlineReplicaIds;
 
         public PartitionMetadata(Errors error,
                                  TopicPartition topicPartition,
-                                 Node leader,
+                                 int leaderId,
                                  Optional<Integer> leaderEpoch,
-                                 List<Node> replicas,
-                                 List<Node> isr,
-                                 List<Node> offlineReplicas) {
+                                 List<Integer> replicaIds,
+                                 List<Integer> inSyncReplicaIds,
+                                 List<Integer> offlineReplicaIds) {
             this.error = error;
             this.topicPartition = topicPartition;
-            this.leader = leader;
+            this.leaderId = leaderId;
             this.leaderEpoch = leaderEpoch;
-            this.replicas = replicas;
-            this.isr = isr;
-            this.offlineReplicas = offlineReplicas;
-        }
-
-        public PartitionMetadata withoutLeaderEpoch() {
-            return new PartitionMetadata(error,
-                    topicPartition,
-                    leader,
-                    Optional.empty(),
-                    replicas,
-                    isr,
-                    offlineReplicas);
-        }
-
-        public Errors error() {
-            return error;
+            this.replicaIds = replicaIds;
+            this.inSyncReplicaIds = inSyncReplicaIds;
+            this.offlineReplicaIds = offlineReplicaIds;
         }
 
         public int partition() {
@@ -362,28 +358,14 @@ public class MetadataResponse extends AbstractResponse {
             return topicPartition.topic();
         }
 
-        public int leaderId() {
-            return leader == null ? -1 : leader.id();
-        }
-
-        public Optional<Integer> leaderEpoch() {
-            return leaderEpoch;
-        }
-
-        public Node leader() {
-            return leader;
-        }
-
-        public List<Node> replicas() {
-            return replicas;
-        }
-
-        public List<Node> isr() {
-            return isr;
-        }
-
-        public List<Node> offlineReplicas() {
-            return offlineReplicas;
+        public PartitionMetadata withoutLeaderEpoch() {
+            return new PartitionMetadata(error,
+                    topicPartition,
+                    leaderId,
+                    Optional.empty(),
+                    replicaIds,
+                    inSyncReplicaIds,
+                    offlineReplicaIds);
         }
 
         @Override
@@ -391,32 +373,31 @@ public class MetadataResponse extends AbstractResponse {
             return "PartitionMetadata(" +
                     ", error=" + error +
                     ", partition=" + topicPartition +
-                    ", leader=" + leader +
+                    ", leader=" + leaderId +
                     ", leaderEpoch=" + leaderEpoch +
-                    ", replicas=" + Utils.join(replicas, ",") +
-                    ", isr=" + Utils.join(isr, ",") +
-                    ", offlineReplicas=" + Utils.join(offlineReplicas, ",") + ')';
+                    ", replicas=" + Utils.join(replicaIds, ",") +
+                    ", isr=" + Utils.join(inSyncReplicaIds, ",") +
+                    ", offlineReplicas=" + Utils.join(offlineReplicaIds, ",") + ')';
         }
     }
 
     private static class Holder {
-        private final Collection<Node> brokers;
+        private final Map<Integer, Node> brokers;
         private final Node controller;
         private final Collection<TopicMetadata> topicMetadata;
 
         Holder(MetadataResponseData data) {
-            this.brokers = Collections.unmodifiableCollection(createBrokers(data));
-            Map<Integer, Node> brokerMap = brokers.stream().collect(Collectors.toMap(Node::id, b -> b));
-            this.topicMetadata = createTopicMetadata(data, brokerMap);
-            this.controller = brokerMap.get(data.controllerId());
+            this.brokers = Collections.unmodifiableMap(createBrokers(data));
+            this.topicMetadata = createTopicMetadata(data);
+            this.controller = brokers.get(data.controllerId());
         }
 
-        private Collection<Node> createBrokers(MetadataResponseData data) {
-            return data.brokers().valuesList().stream().map(b ->
-                    new Node(b.nodeId(), b.host(), b.port(), b.rack())).collect(Collectors.toList());
+        private Map<Integer, Node> createBrokers(MetadataResponseData data) {
+            return data.brokers().valuesList().stream().map(b -> new Node(b.nodeId(), b.host(), b.port(), b.rack()))
+                    .collect(Collectors.toMap(Node::id, Function.identity()));
         }
 
-        private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data, Map<Integer, Node> brokerMap) {
+        private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data) {
             List<TopicMetadata> topicMetadataList = new ArrayList<>();
             for (MetadataResponseTopic topicMetadata : data.topics()) {
                 Errors topicError = Errors.forCode(topicMetadata.errorCode());
@@ -429,13 +410,10 @@ public class MetadataResponse extends AbstractResponse {
                     int partitionIndex = partitionMetadata.partitionIndex();
                     int leader = partitionMetadata.leaderId();
                     Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
-                    Node leaderNode = leader == -1 ? null : brokerMap.get(leader);
-                    List<Node> replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes());
-                    List<Node> isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes());
-                    List<Node> offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas());
                     TopicPartition topicPartition = new TopicPartition(topic, partitionIndex);
-                    partitionMetadataList.add(new PartitionMetadata(partitionError, topicPartition, leaderNode,
-                            leaderEpoch, replicaNodes, isrNodes, offlineNodes));
+                    partitionMetadataList.add(new PartitionMetadata(partitionError, topicPartition, leader,
+                            leaderEpoch, partitionMetadata.replicaNodes(), partitionMetadata.isrNodes(),
+                            partitionMetadata.offlineReplicas()));
                 }
 
                 topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
@@ -444,18 +422,6 @@ public class MetadataResponse extends AbstractResponse {
             return topicMetadataList;
         }
 
-        private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
-            List<Node> nodes = new ArrayList<>(brokerIds.size());
-            for (Integer brokerId : brokerIds) {
-                Node node = brokers.get(brokerId);
-                if (node == null)
-                    nodes.add(new Node(brokerId, "", -1));
-                else
-                    nodes.add(node);
-            }
-            return nodes;
-        }
-
     }
 
     public static MetadataResponse prepareResponse(int throttleTimeMs, Collection<Node> brokers, String clusterId,
@@ -487,11 +453,11 @@ public class MetadataResponse extends AbstractResponse {
                 metadataResponseTopic.partitions().add(new MetadataResponsePartition()
                     .setErrorCode(partitionMetadata.error.code())
                     .setPartitionIndex(partitionMetadata.partition())
-                    .setLeaderId(partitionMetadata.leader == null ? -1 : partitionMetadata.leader.id())
-                    .setLeaderEpoch(partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
-                    .setReplicaNodes(partitionMetadata.replicas.stream().map(Node::id).collect(Collectors.toList()))
-                    .setIsrNodes(partitionMetadata.isr.stream().map(Node::id).collect(Collectors.toList()))
-                    .setOfflineReplicas(partitionMetadata.offlineReplicas.stream().map(Node::id).collect(Collectors.toList())));
+                    .setLeaderId(partitionMetadata.leaderId)
+                    .setLeaderEpoch(partitionMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+                    .setReplicaNodes(partitionMetadata.replicaIds)
+                    .setIsrNodes(partitionMetadata.inSyncReplicaIds)
+                    .setOfflineReplicas(partitionMetadata.offlineReplicaIds));
             }
             responseData.topics().add(metadataResponseTopic);
         });
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 440ee55..6cd73fb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -43,10 +43,8 @@ import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 import static org.apache.kafka.test.TestUtils.assertOptional;
 import static org.junit.Assert.assertEquals;
@@ -198,7 +196,7 @@ public class MetadataTest {
             metadata.update(response, 100);
             assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
             MetadataResponse.PartitionMetadata metadata = this.metadata.partitionMetadataIfCurrent(tp).get();
-            assertEquals(Optional.empty(), metadata.leaderEpoch());
+            assertEquals(Optional.empty(), metadata.leaderEpoch);
         }
 
         for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++) {
@@ -208,7 +206,7 @@ public class MetadataTest {
             metadata.update(response, 100);
             assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
             MetadataResponse.PartitionMetadata info = metadata.partitionMetadataIfCurrent(tp).get();
-            assertEquals(Optional.of(10), info.leaderEpoch());
+            assertEquals(Optional.of(10), info.leaderEpoch);
         }
     }
 
@@ -258,10 +256,8 @@ public class MetadataTest {
         assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
         MetadataResponse.PartitionMetadata metadata = this.metadata.partitionMetadataIfCurrent(tp).get();
 
-        List<Integer> cachedIsr = metadata.isr().stream()
-                .map(Node::id).collect(Collectors.toList());
-        assertEquals(Arrays.asList(1, 2, 3), cachedIsr);
-        assertEquals(Optional.of(10), metadata.leaderEpoch());
+        assertEquals(Arrays.asList(1, 2, 3), metadata.inSyncReplicaIds);
+        assertEquals(Optional.of(10), metadata.leaderEpoch);
     }
 
     @Test
@@ -452,7 +448,7 @@ public class MetadataTest {
         // still works
         assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.partitionMetadataIfCurrent(tp).get().partition(), 0);
-        assertEquals(metadata.partitionMetadataIfCurrent(tp).get().leader().id(), 0);
+        assertEquals(metadata.partitionMetadataIfCurrent(tp).get().leaderId, 0);
     }
 
     @Test
@@ -611,8 +607,9 @@ public class MetadataTest {
 
         MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 2, Collections.emptyMap(), partitionCounts, _tp -> 99,
             (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
-                new MetadataResponse.PartitionMetadata(error, partition, node0, leaderEpoch,
-                    Collections.singletonList(node0), Collections.emptyList(), Collections.singletonList(node1)));
+                new MetadataResponse.PartitionMetadata(error, partition, node0.id(), leaderEpoch,
+                    Collections.singletonList(node0.id()), Collections.emptyList(),
+                        Collections.singletonList(node1.id())));
         metadata.update(emptyMetadataResponse(), 0L);
         metadata.update(metadataResponse, 10L);
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 913166b..ef17824 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -108,11 +108,11 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
-import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
-import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
 import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
+import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetDeleteResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -340,11 +340,11 @@ public class KafkaAdminClientTest {
             for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
                 PartitionMetadata pm = new PartitionMetadata(error,
                         new TopicPartition(topic, pInfo.partition()),
-                        pInfo.leader(),
+                        pInfo.leader().id(),
                         Optional.of(234),
-                        Arrays.asList(pInfo.replicas()),
-                        Arrays.asList(pInfo.inSyncReplicas()),
-                        Arrays.asList(pInfo.offlineReplicas()));
+                        Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList()),
+                        Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList()),
+                        Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
                 pms.add(pm);
             }
             TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
@@ -606,8 +606,8 @@ public class KafkaAdminClientTest {
             // Then we respond to the DescribeTopic request
             Node leader = initializedCluster.nodes().get(0);
             MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
-                    Errors.NONE, new TopicPartition(topic, 0), leader, Optional.of(10),
-                    singletonList(leader), singletonList(leader), singletonList(leader));
+                    Errors.NONE, new TopicPartition(topic, 0), leader.id(), Optional.of(10),
+                    singletonList(leader.id()), singletonList(leader.id()), singletonList(leader.id()));
             env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(),
                     initializedCluster.clusterResource().clusterId(), 1,
                     singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
@@ -982,11 +982,11 @@ public class KafkaAdminClientTest {
             List<Node> nodes = env.cluster().nodes();
 
             List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>();
-            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0, nodes.get(0),
-                    Optional.of(5), singletonList(nodes.get(0)), singletonList(nodes.get(0)),
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0, nodes.get(0).id(),
+                    Optional.of(5), singletonList(nodes.get(0).id()), singletonList(nodes.get(0).id()),
                     Collections.emptyList()));
-            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp1, nodes.get(1),
-                    Optional.of(5), singletonList(nodes.get(1)), singletonList(nodes.get(1)), Collections.emptyList()));
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp1, nodes.get(1).id(),
+                    Optional.of(5), singletonList(nodes.get(1).id()), singletonList(nodes.get(1).id()), Collections.emptyList()));
 
             List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
             topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false, partitionMetadata));
@@ -1046,17 +1046,17 @@ public class KafkaAdminClientTest {
 
             List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
             List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition0, nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition1, nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, myTopicPartition2, null,
-                    Optional.empty(), singletonList(nodes.get(0)), singletonList(nodes.get(0)),
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition0, nodes.get(0).id(), Optional.of(5),
+                    singletonList(nodes.get(0).id()), singletonList(nodes.get(0).id()), Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition1, nodes.get(0).id(), Optional.of(5),
+                    singletonList(nodes.get(0).id()), singletonList(nodes.get(0).id()), Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, myTopicPartition2, -1,
+                    Optional.empty(), singletonList(nodes.get(0).id()), singletonList(nodes.get(0).id()),
                     Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition3, nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition4, nodes.get(0), Optional.of(5),
-                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition3, nodes.get(0).id(), Optional.of(5),
+                    singletonList(nodes.get(0).id()), singletonList(nodes.get(0).id()), Collections.emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition4, nodes.get(0).id(), Optional.of(5),
+                    singletonList(nodes.get(0).id()), singletonList(nodes.get(0).id()), Collections.emptyList()));
 
             t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic", false, p));
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index faec501..5d1f1e9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1353,7 +1353,7 @@ public class ConsumerCoordinatorTest {
             Node node = new Node(0, "localhost", 9999);
             MetadataResponse.PartitionMetadata partitionMetadata =
                 new MetadataResponse.PartitionMetadata(Errors.NONE, new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
-                        node, Optional.empty(), singletonList(node), singletonList(node), singletonList(node));
+                        node.id(), Optional.empty(), singletonList(node.id()), singletonList(node.id()), singletonList(node.id()));
             MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
                 Topic.GROUP_METADATA_TOPIC_NAME, true, singletonList(partitionMetadata));
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index ce90089..5757a24 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -157,7 +157,8 @@ public class ConsumerMetadataTest {
 
     private MetadataResponse.TopicMetadata topicMetadata(String topic, boolean isInternal) {
         MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
-                new TopicPartition(topic, 0), node, Optional.of(5), singletonList(node), singletonList(node), singletonList(node));
+                new TopicPartition(topic, 0), node.id(), Optional.of(5),
+                singletonList(node.id()), singletonList(node.id()), singletonList(node.id()));
         return new MetadataResponse.TopicMetadata(Errors.NONE, topic, isInternal, singletonList(partitionMetadata));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index ebad2e0..5d0ba38 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1946,14 +1946,14 @@ public class FetcherTest {
             List<MetadataResponse.PartitionMetadata> altPartitions = new ArrayList<>();
             for (MetadataResponse.PartitionMetadata p : partitions) {
                 altPartitions.add(new MetadataResponse.PartitionMetadata(
-                    p.error(),
-                    new TopicPartition(item.topic(), p.partition()),
-                    null, //no leader
+                    p.error,
+                    p.topicPartition,
+                    -1, //no leader
                     Optional.empty(),
-                    p.replicas(),
-                    p.isr(),
-                    p.offlineReplicas())
-                );
+                    p.replicaIds,
+                    p.inSyncReplicaIds,
+                    p.offlineReplicaIds
+                ));
             }
             MetadataResponse.TopicMetadata alteredTopic = new MetadataResponse.TopicMetadata(
                 item.error(),
@@ -2990,8 +2990,8 @@ public class FetcherTest {
 
         List<ConsumerRecord<byte[], byte[]>> records;
         assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
-        subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.leaderAndEpoch(tp0)));
-        subscriptions.seekValidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1)));
+        subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.currentLeaderOrEmpty(tp0)));
+        subscriptions.seekValidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeaderOrEmpty(tp1)));
 
         // Fetch some records and establish an incremental fetch session.
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions1 = new LinkedHashMap<>();
@@ -3470,7 +3470,7 @@ public class FetcherTest {
 
         // Seek with a position and leader+epoch
         Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
-                metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+                metadata.currentLeaderOrEmpty(tp0).leader, Optional.of(epochOne));
         subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch));
         assertFalse(client.isConnected(node.idString()));
         assertTrue(subscriptions.awaitingValidation(tp0));
@@ -3519,7 +3519,7 @@ public class FetcherTest {
 
         // Seek with a position and leader+epoch
         Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
-                metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+                metadata.currentLeaderOrEmpty(tp0).leader, Optional.of(epochOne));
         subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
 
         // Update metadata to epoch=2, enter validation
@@ -3547,7 +3547,7 @@ public class FetcherTest {
         Node node = metadata.fetch().nodes().get(0);
         apiVersions.update(node.idString(), NodeApiVersions.create());
 
-        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.currentLeaderOrEmpty(tp0).leader, Optional.of(epochOne));
         subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
 
         fetcher.validateOffsetsIfNeeded();
@@ -3590,7 +3590,7 @@ public class FetcherTest {
         apiVersions.update(node.idString(), NodeApiVersions.create());
 
         // Seek with a position and leader+epoch
-        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.currentLeaderOrEmpty(tp0).leader, Optional.of(epochOne));
         subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
 
         // Update metadata to epoch=2, enter validation
@@ -3660,7 +3660,7 @@ public class FetcherTest {
         apiVersions.update(node.idString(), NodeApiVersions.create());
 
         // Seek
-        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(1));
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.currentLeaderOrEmpty(tp0).leader, Optional.of(1));
         subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1), leaderAndEpoch));
 
         // Check for truncation, this should cause tp0 to go into validation
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e5bd1df..8bebaa5 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -161,6 +161,8 @@ import java.util.Optional;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.protocol.ApiKeys.FETCH;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.apache.kafka.test.TestUtils.toBuffer;
@@ -667,7 +669,7 @@ public class RequestResponseTest {
         responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData<>(Errors.NONE, 900000,
                 5, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records));
         responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData<>(Errors.NONE, 70000,
-                6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), Collections.emptyList(), records));
+                6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), emptyList(), records));
 
         FetchResponse<MemoryRecords> response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
         FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -1163,20 +1165,20 @@ public class RequestResponseTest {
 
     private MetadataResponse createMetadataResponse() {
         Node node = new Node(1, "host1", 1001);
-        List<Node> replicas = asList(node);
-        List<Node> isr = asList(node);
-        List<Node> offlineReplicas = asList();
+        List<Integer> replicas = singletonList(node.id());
+        List<Integer> isr = singletonList(node.id());
+        List<Integer> offlineReplicas = emptyList();
 
         List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>();
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true,
                 asList(new MetadataResponse.PartitionMetadata(Errors.NONE,
-                        new TopicPartition("__consumer_offsets", 1), node, Optional.of(5),
+                        new TopicPartition("__consumer_offsets", 1), node.id(), Optional.of(5),
                         replicas, isr, offlineReplicas))));
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
-                Collections.emptyList()));
+                emptyList()));
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic3", false,
             asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE,
-                    new TopicPartition("topic3", 0), null,
+                    new TopicPartition("topic3", 0), -1,
                     Optional.empty(), replicas, isr, offlineReplicas))));
 
         return MetadataResponse.prepareResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
@@ -1725,13 +1727,13 @@ public class RequestResponseTest {
         Map<ConfigResource, Collection<String>> resources = new HashMap<>();
         resources.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), asList("foo", "bar"));
         resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), null);
-        resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic a"), Collections.emptyList());
+        resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic a"), emptyList());
         return new DescribeConfigsRequest.Builder(resources).build((short) version);
     }
 
     private DescribeConfigsResponse createDescribeConfigsResponse() {
         Map<ConfigResource, DescribeConfigsResponse.Config> configs = new HashMap<>();
-        List<DescribeConfigsResponse.ConfigSynonym> synonyms = Collections.emptyList();
+        List<DescribeConfigsResponse.ConfigSynonym> synonyms = emptyList();
         List<DescribeConfigsResponse.ConfigEntry> configEntries = asList(
                 new DescribeConfigsResponse.ConfigEntry("config_name", "config_value",
                         DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false, synonyms),
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 916feea..9c5e351 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -158,9 +158,10 @@ public class TestUtils {
             for (int i = 0; i < numPartitions; i++) {
                 TopicPartition tp = new TopicPartition(topic, i);
                 Node leader = nodes.get(i % nodes.size());
-                List<Node> replicas = Collections.singletonList(leader);
+                List<Integer> replicas = Collections.singletonList(leader.id());
                 partitionMetadata.add(partitionSupplier.supply(
-                        Errors.NONE, tp, leader, Optional.ofNullable(epochSupplier.apply(tp)), replicas, replicas, replicas));
+                        Errors.NONE, tp, leader.id(), Optional.ofNullable(epochSupplier.apply(tp)),
+                        replicas, replicas, replicas));
             }
 
             topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic,
@@ -180,11 +181,11 @@ public class TestUtils {
     public interface PartitionMetadataSupplier {
         MetadataResponse.PartitionMetadata supply(Errors error,
                               TopicPartition partition,
-                              Node leader,
+                              int leaderId,
                               Optional<Integer> leaderEpoch,
-                              List<Node> replicas,
-                              List<Node> isr,
-                              List<Node> offlineReplicas);
+                              List<Integer> replicas,
+                              List<Integer> isr,
+                              List<Integer> offlineReplicas);
     }
 
     public static Cluster clusterWith(final int nodes, final String topic, final int partitions) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f252191..2189bfd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1288,8 +1288,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         } else {
           val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
             .find(_.partition == partition)
-            .map(_.leader)
-            .flatMap(p => Option(p))
+            .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
+            .flatMap(_.getNode(request.context.listenerName))
 
           coordinatorEndpoint match {
             case Some(endpoint) if !endpoint.isEmpty =>
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 1ad0e41..c2ac2b7 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.util
 import java.util.{Collections, Optional}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
@@ -57,16 +58,20 @@ class MetadataCache(brokerId: Int) extends Logging {
   // we should be careful about adding additional logic here. Relatedly, `brokers` is
   // `Iterable[Integer]` instead of `Iterable[Int]` to avoid a collection copy.
   // filterUnavailableEndpoints exists to support v0 MetadataResponses
-  private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
-    val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))
-    brokers.foreach { brokerId =>
-      val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match {
-        case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId, "", -1)) else None
-        case Some(node) => Some(node)
+  private def maybeFilterAliveReplicas(snapshot: MetadataSnapshot,
+                                       brokers: java.util.List[Integer],
+                                       listenerName: ListenerName,
+                                       filterUnavailableEndpoints: Boolean): java.util.List[Integer] = {
+    if (!filterUnavailableEndpoints) {
+      brokers
+    } else {
+      val res = new util.ArrayList[Integer](math.min(snapshot.aliveBrokers.size, brokers.size))
+      for (brokerId <- brokers.asScala) {
+        if (getAliveEndpoint(snapshot, brokerId, listenerName).isDefined)
+          res.add(brokerId)
       }
-      endpoint.foreach(result +=)
+      res
     }
-    result
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
@@ -80,40 +85,42 @@ class MetadataCache(brokerId: Int) extends Logging {
         val leaderBrokerId = partitionState.leader
         val leaderEpoch = partitionState.leaderEpoch
         val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, listenerName)
-        val replicas = partitionState.replicas.asScala
-        val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints)
-        val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints)
 
-        val isr = partitionState.isr.asScala
-        val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints)
+        val replicas = partitionState.replicas
+        val filteredReplicas = maybeFilterAliveReplicas(snapshot, replicas, listenerName, errorUnavailableEndpoints)
+
+        val isr = partitionState.isr
+        val filteredIsr = maybeFilterAliveReplicas(snapshot, isr, listenerName, errorUnavailableEndpoints)
+
+        val offlineReplicas = partitionState.offlineReplicas
+
         maybeLeader match {
           case None =>
             val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we are already holding the read lock
               debug(s"Error while fetching metadata for $topicPartition: leader not available")
               Errors.LEADER_NOT_AVAILABLE
             } else {
-              debug(s"Error while fetching metadata for $topicPartition: listener $listenerName not found on leader $leaderBrokerId")
+              debug(s"Error while fetching metadata for $topicPartition: listener $listenerName " +
+                s"not found on leader $leaderBrokerId")
               if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
             }
-            new MetadataResponse.PartitionMetadata(error, partitionId.toInt, Node.noNode(),
-              Optional.empty(), replicaInfo.asJava, isrInfo.asJava,
-              offlineReplicaInfo.asJava)
+            new MetadataResponse.PartitionMetadata(error, topicPartition, -1,
+              Optional.empty(), filteredReplicas, filteredIsr, offlineReplicas)
 
           case Some(leader) =>
-            if (replicaInfo.size < replicas.size) {
+            if (filteredReplicas.size < replicas.size) {
               debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
-                s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
-
-              new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId.toInt, leader,
-                Optional.empty(), replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
-            } else if (isrInfo.size < isr.size) {
+                s"following brokers ${replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
+              new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition, leader.id,
+                Optional.empty(), filteredReplicas, filteredIsr, offlineReplicas)
+            } else if (filteredIsr.size < isr.size) {
               debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " +
-                s"following brokers ${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}")
-              new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId.toInt, leader,
-                Optional.empty(), replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
+                s"following brokers ${isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
+              new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition, leader.id,
+                Optional.empty(), filteredReplicas, filteredIsr, offlineReplicas)
             } else {
-              new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId.toInt, leader, Optional.of(leaderEpoch),
-                replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
+              new MetadataResponse.PartitionMetadata(Errors.NONE, topicPartition, leader.id, Optional.of(leaderEpoch),
+                filteredReplicas, filteredIsr, offlineReplicas)
             }
         }
       }
@@ -126,7 +133,9 @@ class MetadataCache(brokerId: Int) extends Logging {
     snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName))
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false,
+  def getTopicMetadata(topics: Set[String],
+                       listenerName: ListenerName,
+                       errorUnavailableEndpoints: Boolean = false,
                        errorUnavailableListeners: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
     val snapshot = metadataSnapshot
     topics.toSeq.flatMap { topic =>
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index efd721d..c3f1cc8 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -101,9 +101,9 @@ class AddPartitionsTest extends BaseRequestTest {
     assertEquals(partitions.size, 3)
     assertEquals(1, partitions(1).partition)
     assertEquals(2, partitions(2).partition)
-    val replicas = partitions(1).replicas
+    val replicas = partitions(1).replicaIds
     assertEquals(replicas.size, 2)
-    assertTrue(replicas.contains(partitions(1).leader))
+    assertTrue(replicas.contains(partitions(1).leaderId))
   }
 
   @Test
@@ -131,10 +131,9 @@ class AddPartitionsTest extends BaseRequestTest {
     assertEquals(0, partitionMetadata(0).partition)
     assertEquals(1, partitionMetadata(1).partition)
     assertEquals(2, partitionMetadata(2).partition)
-    val replicas = partitionMetadata(1).replicas
+    val replicas = partitionMetadata(1).replicaIds
     assertEquals(2, replicas.size)
-    assertTrue(replicas.asScala.head.id == 0 || replicas.asScala.head.id == 1)
-    assertTrue(replicas.asScala(1).id == 0 || replicas.asScala(1).id == 1)
+    assertEquals(Set(0, 1), replicas.asScala.toSet)
   }
 
   @Test
@@ -185,9 +184,8 @@ class AddPartitionsTest extends BaseRequestTest {
     assertTrue(s"Partition $partitionId should exist", partitionOpt.isDefined)
     val partition = partitionOpt.get
 
-    assertNotNull("Partition leader should exist", partition.leader)
     assertEquals("Partition leader id should match", expectedLeaderId, partition.leaderId)
-    assertEquals("Replica set should match", expectedReplicas, partition.replicas.asScala.map(_.id).toSet)
+    assertEquals("Replica set should match", expectedReplicas, partition.replicaIds.asScala.toSet)
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index b110139..5281d80 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -584,7 +584,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
               testPartitionMetadata match {
                 case None => fail(s"Partition metadata is not found in metadata cache")
                 case Some(metadata) => {
-                  result && metadata.error() == Errors.LEADER_NOT_AVAILABLE
+                  result && metadata.error == Errors.LEADER_NOT_AVAILABLE
                 }
               }
             }
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 5f1f671..8ca2c70 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -131,9 +131,9 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
 
           if (replication == -1) {
             assertEquals("The topic should have the default replication factor",
-              configs.head.defaultReplicationFactor, metadataForTopic.partitionMetadata.asScala.head.replicas.size)
+              configs.head.defaultReplicationFactor, metadataForTopic.partitionMetadata.asScala.head.replicaIds.size)
           } else {
-            assertEquals("The topic should have the correct replication factor", replication, metadataForTopic.partitionMetadata.asScala.head.replicas.size)
+            assertEquals("The topic should have the correct replication factor", replication, metadataForTopic.partitionMetadata.asScala.head.replicaIds.size)
           }
         }
       }
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index a42a86a..531792f 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -128,16 +128,18 @@ class MetadataCacheTest {
         partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, partitionId) =>
           assertEquals(Errors.NONE, partitionMetadata.error)
           assertEquals(partitionId, partitionMetadata.partition)
-          val leader = partitionMetadata.leader
           val partitionState = topicPartitionStates.find(_.partitionIndex == partitionId).getOrElse(
             Assertions.fail(s"Unable to find partition state for partition $partitionId"))
-          assertEquals(partitionState.leader, leader.id)
+          assertEquals(partitionState.leader, partitionMetadata.leaderId)
           assertEquals(Optional.of(partitionState.leaderEpoch), partitionMetadata.leaderEpoch)
-          assertEquals(partitionState.isr, partitionMetadata.isr.asScala.map(_.id).asJava)
-          assertEquals(partitionState.replicas, partitionMetadata.replicas.asScala.map(_.id).asJava)
-          val endpoint = endpoints(partitionMetadata.leader.id).find(_.listener == listenerName.value).get
-          assertEquals(endpoint.host, leader.host)
-          assertEquals(endpoint.port, leader.port)
+          assertEquals(partitionState.isr, partitionMetadata.inSyncReplicaIds)
+          assertEquals(partitionState.replicas, partitionMetadata.replicaIds)
+
+            // TODO: Figure out what to do with these checks which no longer make sense
+//          val endpoint = endpoints(partitionMetadata.leaderId).find(_.listener == listenerName.value).get
+//          val leader = partitionMetadata.leader
+//          assertEquals(endpoint.host, leader.host)
+//          assertEquals(endpoint.port, leader.port)
         }
       }
 
@@ -267,9 +269,8 @@ class MetadataCacheTest {
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
     assertEquals(expectedError, partitionMetadata.error)
-    assertFalse(partitionMetadata.isr.isEmpty)
-    assertEquals(1, partitionMetadata.replicas.size)
-    assertEquals(0, partitionMetadata.replicas.get(0).id)
+    assertFalse(partitionMetadata.inSyncReplicaIds.isEmpty)
+    assertEquals(List(0), partitionMetadata.replicaIds.asScala)
   }
 
   @Test
@@ -326,8 +327,8 @@ class MetadataCacheTest {
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
     assertEquals(Errors.NONE, partitionMetadata.error)
-    assertEquals(Set(0, 1), partitionMetadata.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0, 1), partitionMetadata.replicaIds.asScala.toSet)
+    assertEquals(Set(0), partitionMetadata.inSyncReplicaIds.asScala.toSet)
 
     // Validate errorUnavailableEndpoints = true
     val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true)
@@ -342,8 +343,8 @@ class MetadataCacheTest {
     val partitionMetadataWithError = partitionMetadatasWithError.get(0)
     assertEquals(0, partitionMetadataWithError.partition)
     assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error)
-    assertEquals(Set(0), partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0), partitionMetadataWithError.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet)
+    assertEquals(Set(0), partitionMetadataWithError.inSyncReplicaIds.asScala.toSet)
   }
 
   @Test
@@ -400,8 +401,8 @@ class MetadataCacheTest {
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
     assertEquals(Errors.NONE, partitionMetadata.error)
-    assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0, 1), partitionMetadata.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadata.replicaIds.asScala.toSet)
+    assertEquals(Set(0, 1), partitionMetadata.inSyncReplicaIds.asScala.toSet)
 
     // Validate errorUnavailableEndpoints = true
     val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true)
@@ -416,8 +417,8 @@ class MetadataCacheTest {
     val partitionMetadataWithError = partitionMetadatasWithError.get(0)
     assertEquals(0, partitionMetadataWithError.partition)
     assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error)
-    assertEquals(Set(0), partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
-    assertEquals(Set(0), partitionMetadataWithError.isr.asScala.map(_.id).toSet)
+    assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet)
+    assertEquals(Set(0), partitionMetadataWithError.inSyncReplicaIds.asScala.toSet)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index aa6a53f..3e36392 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -21,7 +21,6 @@ import java.util.Properties
 
 import kafka.network.SocketServer
 import kafka.utils.TestUtils
-import org.apache.kafka.common.Node
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.MetadataRequestData
@@ -202,8 +201,8 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals(1, topicMetadata1.partitionMetadata.size)
     val partitionMetadata = topicMetadata1.partitionMetadata.asScala.head
     assertEquals(0, partitionMetadata.partition)
-    assertEquals(2, partitionMetadata.replicas.size)
-    assertNotNull(partitionMetadata.leader)
+    assertEquals(2, partitionMetadata.replicaIds.size)
+    assertTrue(partitionMetadata.leaderId >= 0)
   }
 
   @Test
@@ -243,9 +242,9 @@ class MetadataRequestTest extends BaseRequestTest {
       assertEquals(Set(0, 1), topicMetadata.partitionMetadata.asScala.map(_.partition).toSet)
       topicMetadata.partitionMetadata.asScala.foreach { partitionMetadata =>
         val assignment = replicaAssignment(partitionMetadata.partition)
-        assertEquals(assignment, partitionMetadata.replicas.asScala.map(_.id))
-        assertEquals(assignment, partitionMetadata.isr.asScala.map(_.id))
-        assertEquals(assignment.head, partitionMetadata.leader.id)
+        assertEquals(assignment, partitionMetadata.replicaIds.asScala)
+        assertEquals(assignment, partitionMetadata.inSyncReplicaIds.asScala)
+        assertEquals(assignment.head, partitionMetadata.leaderId)
       }
     }
   }
@@ -268,21 +267,19 @@ class MetadataRequestTest extends BaseRequestTest {
     createTopic(replicaDownTopic, 1, replicaCount)
 
     // Kill a replica node that is not the leader
-    val metadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
+    val metadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
     val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     val downNode = servers.find { server =>
       val serverId = server.dataPlaneRequestProcessor.brokerId
-      val leaderId = partitionMetadata.leader.id
-      val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
+      val leaderId = partitionMetadata.leaderId
+      val replicaIds = partitionMetadata.replicaIds.asScala
       serverId != leaderId && replicaIds.contains(serverId)
     }.get
     downNode.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      val response = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
-      val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
-      val replica = metadata.replicas.asScala.find(_.id == downNode.dataPlaneRequestProcessor.brokerId).get
-      replica.host == "" & replica.port == -1
+      val response = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
+      !response.brokers.asScala.exists(_.id == downNode.dataPlaneRequestProcessor.brokerId)
     }, "Replica was not found down", 5000)
 
     // Validate version 0 still filters unavailable replicas and contains error
@@ -293,37 +290,35 @@ class MetadataRequestTest extends BaseRequestTest {
     assertTrue("Response should have one topic", v0MetadataResponse.topicMetadata.size == 1)
     val v0PartitionMetadata = v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     assertTrue("PartitionMetadata should have an error", v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE)
-    assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size == replicaCount - 1)
+    assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicaIds.size == replicaCount - 1)
 
-    // Validate version 1 returns unavailable replicas with no error
-    val v1MetadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
+    // Validate latest version returns unavailable replicas with no error
+    val v1MetadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
     val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
     assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty)
     assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode))
     assertEquals("Response should have one topic", 1, v1MetadataResponse.topicMetadata.size)
     val v1PartitionMetadata = v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     assertEquals("PartitionMetadata should have no errors", Errors.NONE, v1PartitionMetadata.error)
-    assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicas.size)
+    assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicaIds.size)
   }
 
   @Test
   def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
     def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
       val activeBrokers = servers.filter(_.brokerState.currentState != NotRunning.state)
-      val expectedIsr = activeBrokers.map { broker =>
-        new Node(broker.config.brokerId, "localhost", TestUtils.boundPort(broker), broker.config.rack.orNull)
-      }.sortBy(_.id)
+      val expectedIsr = activeBrokers.map(_.config.brokerId).toSet
 
       // Assert that topic metadata at new brokers is updated correctly
       activeBrokers.foreach { broker =>
-        var actualIsr: Seq[Node] = Seq.empty
+        var actualIsr = Set.empty[Int]
         TestUtils.waitUntilTrue(() => {
           val metadataResponse = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic).asJava, false).build,
             Some(brokerSocketServer(broker.config.brokerId)))
           val firstPartitionMetadata = metadataResponse.topicMetadata.asScala.headOption.flatMap(_.partitionMetadata.asScala.headOption)
           actualIsr = firstPartitionMetadata.map { partitionMetadata =>
-            partitionMetadata.isr.asScala.sortBy(_.id)
-          }.getOrElse(Seq.empty)
+            partitionMetadata.inSyncReplicaIds.asScala.map(Int.unbox).toSet
+          }.getOrElse(Set.empty)
           expectedIsr == actualIsr
         }, s"Topic metadata not updated correctly in broker $broker\n" +
           s"Expected ISR: $expectedIsr \n" +


Mime
View raw message