kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9558; Fix retry logic in KafkaAdminClient listOffsets (#8119)
Date Wed, 19 Feb 2020 17:16:57 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 0c7a33c  KAFKA-9558; Fix retry logic in KafkaAdminClient listOffsets (#8119)
0c7a33c is described below

commit 0c7a33cec13151ea30c6665fe1a11d8255db56c3
Author: Sanjana Kaundinya <skaundinya@confluent.io>
AuthorDate: Wed Feb 19 09:11:45 2020 -0800

    KAFKA-9558; Fix retry logic in KafkaAdminClient listOffsets (#8119)
    
    This PR is to fix the retry logic for `getListOffsetsCalls`. Previously, if there were
partitions with errors, it would only pass in the current call object to retry after a metadata
refresh. However, if there's a leader change, the call object never gets updated with the
correct leader node to query. This PR fixes this by making another call to `getListOffsetsCalls`
with only the error topic partitions as the next calls to be made after the metadata refresh.
In addition there is an a [...]
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  19 ++--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 109 +++++++++++++++++++++
 2 files changed, 120 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 281a8c6..c2dd452 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3783,7 +3783,7 @@ public class KafkaAdminClient extends AdminClient {
                 @Override
                 void handleResponse(AbstractResponse abstractResponse) {
                     ListOffsetResponse response = (ListOffsetResponse) abstractResponse;
-                    Set<TopicPartition> partitionsWithErrors = new HashSet<>();
+                    Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new
HashMap<>();
 
                     for (Entry<TopicPartition, PartitionData> result : response.responseData().entrySet())
{
                         TopicPartition tp = result.getKey();
@@ -3791,8 +3791,11 @@ public class KafkaAdminClient extends AdminClient {
 
                         KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
                         Errors error = partitionData.error;
-                        if (MetadataOperationContext.shouldRefreshMetadata(error)) {
-                            partitionsWithErrors.add(tp);
+                        OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp);
+                        if (offsetRequestSpec == null) {
+                            future.completeExceptionally(new KafkaException("Unexpected topic
partition " + tp + " in broker response!"));
+                        } else if (MetadataOperationContext.shouldRefreshMetadata(error))
{
+                            retryTopicPartitionOffsets.put(tp, offsetRequestSpec);
                         } else if (error == Errors.NONE) {
                             future.complete(new ListOffsetsResultInfo(partitionData.offset,
partitionData.timestamp, partitionData.leaderEpoch));
                         } else {
@@ -3800,12 +3803,12 @@ public class KafkaAdminClient extends AdminClient {
                         }
                     }
 
-                    if (!partitionsWithErrors.isEmpty()) {
-                        partitionsToQuery.keySet().retainAll(partitionsWithErrors);
-                        Set<String> retryTopics = partitionsWithErrors.stream().map(tp
-> tp.topic()).collect(Collectors.toSet());
+                    if (!retryTopicPartitionOffsets.isEmpty()) {
+                        Set<String> retryTopics = retryTopicPartitionOffsets.keySet().stream().map(
+                            TopicPartition::topic).collect(Collectors.toSet());
                         MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions>
retryContext =
-                                new MetadataOperationContext<>(retryTopics, context.options(),
context.deadline(), futures);
-                        rescheduleMetadataTask(retryContext, () -> Collections.singletonList(this));
+                            new MetadataOperationContext<>(retryTopics, context.options(),
context.deadline(), futures);
+                        rescheduleMetadataTask(retryContext, () -> getListOffsetsCalls(retryContext,
retryTopicPartitionOffsets, futures));
                     }
                 }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ed3e8a6..65a3436 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2496,6 +2496,115 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testListOffsetsWithMultiplePartitionsLeaderChange() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        Node node2 = new Node(2, "localhost", 8122);
+        List<Node> nodes = Arrays.asList(node0, node1, node2);
+
+        final PartitionInfo oldPInfo1 = new PartitionInfo("foo", 0, node0,
+            new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
+        final PartitionInfo oldPnfo2 = new PartitionInfo("foo", 1, node0,
+            new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
+        List<PartitionInfo> oldPInfos = Arrays.asList(oldPInfo1, oldPnfo2);
+
+        final Cluster oldCluster = new Cluster("mockClusterId", nodes, oldPInfos,
+            Collections.emptySet(), Collections.emptySet(), node0);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(oldCluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE));
+
+            Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+            responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L,
345L, Optional.of(543)));
+            responseData.put(tp1, new PartitionData(Errors.LEADER_NOT_AVAILABLE, -2L, 123L,
Optional.of(456)));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
+
+            final PartitionInfo newPInfo1 = new PartitionInfo("foo", 0, node1,
+                new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
+            final PartitionInfo newPInfo2 = new PartitionInfo("foo", 1, node2,
+                new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
+            List<PartitionInfo> newPInfos = Arrays.asList(newPInfo1, newPInfo2);
+
+            final Cluster newCluster = new Cluster("mockClusterId", nodes, newPInfos,
+                Collections.emptySet(), Collections.emptySet(), node0);
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(newCluster, Errors.NONE));
+
+            responseData = new HashMap<>();
+            responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543)));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1);
+
+            responseData = new HashMap<>();
+            responseData.put(tp1, new PartitionData(Errors.NONE, -2L, 123L, Optional.of(456)));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node2);
+
+            Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+            partitions.put(tp0, OffsetSpec.latest());
+            partitions.put(tp1, OffsetSpec.latest());
+            ListOffsetsResult result = env.adminClient().listOffsets(partitions);
+            Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
+
+            assertFalse(offsets.isEmpty());
+            assertEquals(345L, offsets.get(tp0).offset());
+            assertEquals(543, offsets.get(tp0).leaderEpoch().get().intValue());
+            assertEquals(-1L, offsets.get(tp0).timestamp());
+            assertEquals(123L, offsets.get(tp1).offset());
+            assertEquals(456, offsets.get(tp1).leaderEpoch().get().intValue());
+            assertEquals(-2L, offsets.get(tp1).timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsWithLeaderChange() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        Node node2 = new Node(2, "localhost", 8122);
+        List<Node> nodes = Arrays.asList(node0, node1, node2);
+
+        final PartitionInfo oldPartitionInfo = new PartitionInfo("foo", 0, node0,
+            new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
+        final Cluster oldCluster = new Cluster("mockClusterId", nodes, singletonList(oldPartitionInfo),
+            Collections.emptySet(), Collections.emptySet(), node0);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(oldCluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE));
+
+            Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+            responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L,
345L, Optional.of(543)));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
+
+            // updating leader from node0 to node1 and metadata refresh because of NOT_LEADER_FOR_PARTITION
+            final PartitionInfo newPartitionInfo = new PartitionInfo("foo", 0, node1,
+                new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
+            final Cluster newCluster = new Cluster("mockClusterId", nodes, singletonList(newPartitionInfo),
+                Collections.emptySet(), Collections.emptySet(), node0);
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(newCluster, Errors.NONE));
+
+            responseData = new HashMap<>();
+            responseData.put(tp0, new PartitionData(Errors.NONE, -2L, 123L, Optional.of(456)));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1);
+
+            Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+            partitions.put(tp0, OffsetSpec.latest());
+            ListOffsetsResult result = env.adminClient().listOffsets(partitions);
+            Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
+
+            assertFalse(offsets.isEmpty());
+            assertEquals(123L, offsets.get(tp0).offset());
+            assertEquals(456, offsets.get(tp0).leaderEpoch().get().intValue());
+            assertEquals(-2L, offsets.get(tp0).timestamp());
+        }
+    }
+
+    @Test
     public void testListOffsetsMetadataNonRetriableErrors() throws Exception {
 
         Node node0 = new Node(0, "localhost", 8120);


Mime
View raw message