kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-9261; Client should handle inconsistent leader metadata (#7772)
Date Wed, 04 Dec 2019 04:40:11 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 77a5ec7  KAFKA-9261; Client should handle inconsistent leader metadata (#7772)
77a5ec7 is described below

commit 77a5ec72d19a60c1cb1523d7c22325260a9bc115
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Dec 3 19:30:50 2019 -0800

    KAFKA-9261; Client should handle inconsistent leader metadata (#7772)
    
    This is a reduced scope fix for KAFKA-9261. The purpose of this patch is to ensure that
    partition leader state is kept in sync with broker metadata in MetadataCache and
    consequently in Cluster. Due to the possibility of metadata event reordering, it was
    possible for this state to be inconsistent which could lead to an NPE in some cases. The
    test case here provides a specific scenario where this could happen.
    
    Also see #7770 for additional detail.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../java/org/apache/kafka/clients/Metadata.java    | 26 ++++++-
 .../org/apache/kafka/clients/MetadataCache.java    |  6 +-
 .../kafka/common/requests/MetadataResponse.java    | 40 ++++++-----
 .../org/apache/kafka/clients/MetadataTest.java     | 83 ++++++++++++++++++++++
 4 files changed, 132 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 81ad4a9..2a3652a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -292,6 +292,8 @@ public class Metadata implements Closeable {
                                                  Predicate<MetadataResponse.TopicMetadata>
topicsToRetain) {
         Set<String> internalTopics = new HashSet<>();
         List<MetadataCache.PartitionInfoAndEpoch> partitions = new ArrayList<>();
+        Map<Integer, Node> brokersById = metadataResponse.brokersById();
+
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata())
{
             if (!topicsToRetain.test(metadata))
                 continue;
@@ -299,12 +301,30 @@ public class Metadata implements Closeable {
             if (metadata.error() == Errors.NONE) {
                 if (metadata.isInternal())
                     internalTopics.add(metadata.topic());
-                for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata())
{
 
+                for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata())
{
                     // Even if the partition's metadata includes an error, we need to handle
the update to catch new epochs
                     updatePartitionInfo(metadata.topic(), partitionMetadata, partitionInfo
-> {
                         int epoch = partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH);
-                        partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo,
epoch));
+                        Node leader = partitionInfo.leader();
+
+                        if (leader != null && !leader.equals(brokersById.get(leader.id())))
{
+                            // If we are reusing metadata from a previous response (which
is possible if it
+                            // contained a larger epoch), we may not have leader information
available in the
+                            // latest response. To keep the state consistent, we override
the partition metadata
+                            // so that the leader is set consistently with the broker metadata
+
+                            PartitionInfo partitionInfoWithoutLeader = new PartitionInfo(
+                                    partitionInfo.topic(),
+                                    partitionInfo.partition(),
+                                    brokersById.get(leader.id()),
+                                    partitionInfo.replicas(),
+                                    partitionInfo.inSyncReplicas(),
+                                    partitionInfo.offlineReplicas());
+                            partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfoWithoutLeader,
epoch));
+                        } else {
+                            partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo,
epoch));
+                        }
                     });
 
                     if (partitionMetadata.error().exception() instanceof InvalidMetadataException)
{
@@ -319,7 +339,7 @@ public class Metadata implements Closeable {
             }
         }
 
-        return new MetadataCache(metadataResponse.clusterId(), new ArrayList<>(metadataResponse.brokers()),
partitions,
+        return new MetadataCache(metadataResponse.clusterId(), brokersById.values(), partitions,
                 metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
                 metadataResponse.topicsByError(Errors.INVALID_TOPIC_EXCEPTION),
                 internalTopics, metadataResponse.controller());
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index b928b8e..0b8c853 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -39,7 +39,7 @@ import java.util.stream.Collectors;
  */
 public class MetadataCache {
     private final String clusterId;
-    private final List<Node> nodes;
+    private final Collection<Node> nodes;
     private final Set<String> unauthorizedTopics;
     private final Set<String> invalidTopics;
     private final Set<String> internalTopics;
@@ -49,7 +49,7 @@ public class MetadataCache {
     private Cluster clusterInstance;
 
     MetadataCache(String clusterId,
-                  List<Node> nodes,
+                  Collection<Node> nodes,
                   Collection<PartitionInfoAndEpoch> partitions,
                   Set<String> unauthorizedTopics,
                   Set<String> invalidTopics,
@@ -59,7 +59,7 @@ public class MetadataCache {
     }
 
     MetadataCache(String clusterId,
-                  List<Node> nodes,
+                  Collection<Node> nodes,
                   Collection<PartitionInfoAndEpoch> partitions,
                   Set<String> unauthorizedTopics,
                   Set<String> invalidTopics,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index ef5381b..350da2b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -20,9 +20,9 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.message.MetadataResponseData;
-import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
-import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -32,7 +32,6 @@ import org.apache.kafka.common.utils.Utils;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -178,6 +177,14 @@ public class MetadataResponse extends AbstractResponse {
      * @return the brokers
      */
     public Collection<Node> brokers() {
+        return holder().brokers.values();
+    }
+
+    /**
+     * Get a map of all brokers keyed by the brokerId.
+     * @return A map from the brokerId to the broker `Node` information
+     */
+    public Map<Integer, Node> brokersById() {
         return holder().brokers;
     }
 
@@ -360,23 +367,22 @@ public class MetadataResponse extends AbstractResponse {
     }
 
     private static class Holder {
-        private final Collection<Node> brokers;
+        private final Map<Integer, Node> brokers;
         private final Node controller;
         private final Collection<TopicMetadata> topicMetadata;
 
         Holder(MetadataResponseData data) {
-            this.brokers = Collections.unmodifiableCollection(createBrokers(data));
-            Map<Integer, Node> brokerMap = brokers.stream().collect(Collectors.toMap(Node::id,
b -> b));
-            this.topicMetadata = createTopicMetadata(data, brokerMap);
-            this.controller = brokerMap.get(data.controllerId());
+            this.brokers = createBrokers(data);
+            this.topicMetadata = createTopicMetadata(data);
+            this.controller = brokers.get(data.controllerId());
         }
 
-        private Collection<Node> createBrokers(MetadataResponseData data) {
-            return data.brokers().valuesList().stream().map(b ->
-                    new Node(b.nodeId(), b.host(), b.port(), b.rack())).collect(Collectors.toList());
+        private Map<Integer, Node> createBrokers(MetadataResponseData data) {
+            return data.brokers().valuesList().stream().map(b -> new Node(b.nodeId(),
b.host(), b.port(), b.rack()))
+                    .collect(Collectors.toMap(Node::id, b -> b));
         }
 
-        private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData
data, Map<Integer, Node> brokerMap) {
+        private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData
data) {
             List<TopicMetadata> topicMetadataList = new ArrayList<>();
             for (MetadataResponseTopic topicMetadata : data.topics()) {
                 Errors topicError = Errors.forCode(topicMetadata.errorCode());
@@ -389,10 +395,10 @@ public class MetadataResponse extends AbstractResponse {
                     int partitionIndex = partitionMetadata.partitionIndex();
                     int leader = partitionMetadata.leaderId();
                     Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
-                    Node leaderNode = leader == -1 ? null : brokerMap.get(leader);
-                    List<Node> replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes());
-                    List<Node> isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes());
-                    List<Node> offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas());
+                    Node leaderNode = leader == -1 ? null : brokers.get(leader);
+                    List<Node> replicaNodes = convertToNodes(partitionMetadata.replicaNodes());
+                    List<Node> isrNodes = convertToNodes(partitionMetadata.isrNodes());
+                    List<Node> offlineNodes = convertToNodes(partitionMetadata.offlineReplicas());
                     partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex,
leaderNode, leaderEpoch,
                             replicaNodes, isrNodes, offlineNodes));
                 }
@@ -403,7 +409,7 @@ public class MetadataResponse extends AbstractResponse {
             return topicMetadataList;
         }
 
-        private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer>
brokerIds) {
+        private List<Node> convertToNodes(List<Integer> brokerIds) {
             List<Node> nodes = new ArrayList<>(brokerIds.size());
             for (Integer brokerId : brokerIds) {
                 Node node = brokers.get(brokerId);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index bb34094..9a9026d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -23,6 +23,12 @@ import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBrokerCollection;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopicCollection;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.LogContext;
@@ -33,9 +39,12 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.kafka.test.TestUtils.assertOptional;
 import static org.junit.Assert.assertEquals;
@@ -495,4 +504,78 @@ public class MetadataTest {
         assertEquals(metadata.fetch().nodeById(0).id(), 0);
         assertEquals(metadata.fetch().nodeById(1).id(), 1);
     }
+
+    @Test
+    public void testLeaderMetadataInconsistentWithBrokerMetadata() {
+        // Tests a reordering scenario which can lead to inconsistent leader state.
+        // A partition initially has one broker offline. That broker comes online and
+        // is elected leader. The client sees these two events in the opposite order.
+
+        TopicPartition tp = new TopicPartition("topic", 0);
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node2 = new Node(2, "localhost", 9094);
+
+        // The first metadata received by broker (epoch=10)
+        MetadataResponsePartition firstPartitionMetadata = new MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setErrorCode(Errors.NONE.code())
+                .setLeaderEpoch(10)
+                .setLeaderId(0)
+                .setReplicaNodes(Arrays.asList(0, 1, 2))
+                .setIsrNodes(Arrays.asList(0, 1, 2))
+                .setOfflineReplicas(Collections.emptyList());
+
+        // The second metadata received has stale metadata (epoch=8)
+        MetadataResponsePartition secondPartitionMetadata = new MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setErrorCode(Errors.NONE.code())
+                .setLeaderEpoch(8)
+                .setLeaderId(1)
+                .setReplicaNodes(Arrays.asList(0, 1, 2))
+                .setIsrNodes(Arrays.asList(1, 2))
+                .setOfflineReplicas(Collections.singletonList(0));
+
+        metadata.update(new MetadataResponse(new MetadataResponseData()
+                        .setTopics(buildTopicCollection(tp.topic(), firstPartitionMetadata))
+                        .setBrokers(buildBrokerCollection(Arrays.asList(node0, node1, node2)))),
+                10L);
+
+        metadata.update(new MetadataResponse(new MetadataResponseData()
+                        .setTopics(buildTopicCollection(tp.topic(), secondPartitionMetadata))
+                        .setBrokers(buildBrokerCollection(Arrays.asList(node1, node2)))),
+                20L);
+
+        assertNull(metadata.fetch().leaderFor(tp));
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+        assertTrue(metadata.leaderAndEpoch(tp).leader.isEmpty());
+    }
+
+    private MetadataResponseTopicCollection buildTopicCollection(String topic, MetadataResponsePartition
partitionMetadata) {
+        MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setIsInternal(false);
+
+        topicMetadata.setPartitions(Collections.singletonList(partitionMetadata));
+
+        MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
+        topics.add(topicMetadata);
+        return topics;
+    }
+
+    private MetadataResponseBrokerCollection buildBrokerCollection(List<Node> nodes)
{
+        MetadataResponseBrokerCollection brokers = new MetadataResponseBrokerCollection();
+        for (Node node : nodes) {
+            MetadataResponseBroker broker = new MetadataResponseBroker()
+                    .setNodeId(node.id())
+                    .setHost(node.host())
+                    .setPort(node.port())
+                    .setRack(node.rack());
+            brokers.add(broker);
+        }
+        return brokers;
+    }
+
 }


Mime
View raw message