kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] 01/02: KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily (#7449)
Date Tue, 08 Oct 2019 21:18:02 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c74b437d0081dbd9b3158ec10c9a54780d57e6a5
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Oct 8 08:19:07 2019 -0700

    KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily (#7449)
    
    The deleteRecords API in the AdminClient groups records to be sent by the partition leaders.
If one of these requests fails, we currently fail all futures, including those tied to requests
sent to other leaders. It would be better to fail only those partitions included in the failed
request.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 73 ++++++++++----------
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 80 +++++++++++++++++++---
 .../test/java/org/apache/kafka/test/TestUtils.java | 17 +++++
 3 files changed, 127 insertions(+), 43 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index a7663da..d4958f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -80,9 +80,9 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
 import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
@@ -109,21 +109,21 @@ import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
 import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsRequest;
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
 import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
-import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
-import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
@@ -144,8 +144,8 @@ import org.apache.kafka.common.requests.ElectLeadersRequest;
 import org.apache.kafka.common.requests.ElectLeadersResponse;
 import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
-import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
@@ -158,7 +158,6 @@ import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetDeleteRequest;
-import org.apache.kafka.common.requests.OffsetDeleteRequest.Builder;
 import org.apache.kafka.common.requests.OffsetDeleteResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -190,21 +189,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.TreeMap;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
-import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
 import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
 import static org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
-import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic;
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 
@@ -315,9 +315,18 @@ public class KafkaAdminClient extends AdminClient {
      * @param <T>       The KafkaFutureImpl result type.
      */
     private static <T> void completeAllExceptionally(Collection<KafkaFutureImpl<T>>
futures, Throwable exc) {
-        for (KafkaFutureImpl<?> future : futures) {
-            future.completeExceptionally(exc);
-        }
+        completeAllExceptionally(futures.stream(), exc);
+    }
+
+    /**
+     * Send an exception to all futures in the provided stream
+     *
+     * @param futures   The stream of KafkaFutureImpl objects.
+     * @param exc       The exception
+     * @param <T>       The KafkaFutureImpl result type.
+     */
+    private static <T> void completeAllExceptionally(Stream<KafkaFutureImpl<T>>
futures, Throwable exc) {
+        futures.forEach(future -> future.completeExceptionally(exc));
     }
 
     /**
@@ -2364,39 +2373,31 @@ public class KafkaAdminClient extends AdminClient {
                 Map<String, Errors> errors = response.errors();
                 Cluster cluster = response.cluster();
 
-                // completing futures for topics with errors
-                for (Map.Entry<String, Errors> topicError: errors.entrySet()) {
-
-                    for (Map.Entry<TopicPartition, KafkaFutureImpl<DeletedRecords>>
future: futures.entrySet()) {
-                        if (future.getKey().topic().equals(topicError.getKey())) {
-                            future.getValue().completeExceptionally(topicError.getValue().exception());
-                        }
-                    }
-                }
-
-                // grouping topic partitions per leader
+                // Group topic partitions by leader
                 Map<Node, Map<TopicPartition, Long>> leaders = new HashMap<>();
                 for (Map.Entry<TopicPartition, RecordsToDelete> entry: recordsToDelete.entrySet())
{
+                    KafkaFutureImpl<DeletedRecords> future = futures.get(entry.getKey());
 
-                    // avoiding to send deletion request for topics with errors
-                    if (!errors.containsKey(entry.getKey().topic())) {
-
+                    // Fail partitions with topic errors
+                    Errors topicError = errors.get(entry.getKey().topic());
+                    if (errors.containsKey(entry.getKey().topic())) {
+                        future.completeExceptionally(topicError.exception());
+                    } else {
                         Node node = cluster.leaderFor(entry.getKey());
                         if (node != null) {
                             if (!leaders.containsKey(node))
                                 leaders.put(node, new HashMap<>());
                             leaders.get(node).put(entry.getKey(), entry.getValue().beforeOffset());
                         } else {
-                            KafkaFutureImpl<DeletedRecords> future = futures.get(entry.getKey());
                             future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
                         }
                     }
                 }
 
-                for (final Map.Entry<Node, Map<TopicPartition, Long>> entry:
leaders.entrySet()) {
-
-                    final long nowDelete = time.milliseconds();
+                final long deleteRecordsCallTimeMs = time.milliseconds();
 
+                for (final Map.Entry<Node, Map<TopicPartition, Long>> entry :
leaders.entrySet()) {
+                    final Map<TopicPartition, Long> partitionDeleteOffsets = entry.getValue();
                     final int brokerId = entry.getKey().id();
 
                     runnable.call(new Call("deleteRecords", deadline,
@@ -2404,7 +2405,7 @@ public class KafkaAdminClient extends AdminClient {
 
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new DeleteRecordsRequest.Builder(timeoutMs, entry.getValue());
+                            return new DeleteRecordsRequest.Builder(timeoutMs, partitionDeleteOffsets);
                         }
 
                         @Override
@@ -2423,9 +2424,11 @@ public class KafkaAdminClient extends AdminClient {
 
                         @Override
                         void handleFailure(Throwable throwable) {
-                            completeAllExceptionally(futures.values(), throwable);
+                            Stream<KafkaFutureImpl<DeletedRecords>> callFutures
=
+                                    partitionDeleteOffsets.keySet().stream().map(futures::get);
+                            completeAllExceptionally(callFutures, throwable);
                         }
-                    }, nowDelete);
+                    }, deleteRecordsCallTimeMs);
                 }
             }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index b6fece9..6d870fc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.clients.admin;
 
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.MockClient;
@@ -52,13 +50,14 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
-import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
 import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
@@ -68,8 +67,8 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
 import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
-import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
@@ -83,14 +82,14 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
-import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
@@ -140,18 +139,20 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
 import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
-import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 /**
@@ -882,6 +883,69 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testDeleteRecordsTopicAuthorizationError() {
+        String topic = "foo";
+        TopicPartition partition = new TopicPartition(topic, 0);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            List<MetadataResponse.TopicMetadata> topics = new ArrayList<>();
+            topics.add(new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED,
topic, false,
+                    Collections.emptyList()));
+
+            env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(), env.cluster().controller().id(),
topics));
+
+            Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
+            recordsToDelete.put(partition, RecordsToDelete.beforeOffset(10L));
+            DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete);
+
+            TestUtils.assertFutureThrows(results.lowWatermarks().get(partition), TopicAuthorizationException.class);
+        }
+    }
+
+    @Test
+    public void testDeleteRecordsMultipleSends() throws Exception {
+        String topic = "foo";
+        TopicPartition tp0 = new TopicPartition(topic, 0);
+        TopicPartition tp1 = new TopicPartition(topic, 1);
+
+        Cluster cluster = mockCluster(0);
+        MockTime time = new MockTime();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster)) {
+            List<Node> nodes = cluster.nodes();
+
+            List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>();
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0.partition(),
nodes.get(0),
+                    Optional.of(5), singletonList(nodes.get(0)), singletonList(nodes.get(0)),
+                    Collections.emptyList()));
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp1.partition(),
nodes.get(1),
+                    Optional.of(5), singletonList(nodes.get(1)), singletonList(nodes.get(1)),
Collections.emptyList()));
+
+            List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+            topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
partitionMetadata));
+
+            env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(cluster.nodes(),
+                    cluster.clusterResource().clusterId(), cluster.controller().id(), topicMetadata));
+
+            Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> deletedPartitions
= new HashMap<>();
+            deletedPartitions.put(tp0, new DeleteRecordsResponse.PartitionResponse(3, Errors.NONE));
+            env.kafkaClient().prepareResponseFrom(new DeleteRecordsResponse(0, deletedPartitions),
nodes.get(0));
+
+            env.kafkaClient().disconnect(nodes.get(1).idString());
+            env.kafkaClient().createPendingAuthenticationError(nodes.get(1), 100);
+
+            Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
+            recordsToDelete.put(tp0, RecordsToDelete.beforeOffset(10L));
+            recordsToDelete.put(tp1, RecordsToDelete.beforeOffset(10L));
+            DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete);
+
+            assertEquals(3L, results.lowWatermarks().get(tp0).get().lowWatermark());
+            TestUtils.assertFutureThrows(results.lowWatermarks().get(tp1), AuthenticationException.class);
+        }
+    }
+
+    @Test
     public void testDeleteRecords() throws Exception {
 
         HashMap<Integer, Node> nodes = new HashMap<>();
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 5858a16..2183e15 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -64,6 +64,7 @@ import java.util.regex.Pattern;
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -457,6 +458,22 @@ public class TestUtils {
         return tps;
     }
 
+    /**
+     * Assert that a future raises an expected exception cause type. Return the exception
cause
+     * if the assertion succeeds; otherwise raise AssertionError.
+     *
+     * @param future The future to await
+     * @param exceptionCauseClass Class of the expected exception cause
+     * @param <T> Exception cause type parameter
+     * @return The caught exception cause
+     */
+    public static <T extends Throwable> T assertFutureThrows(Future<?> future,
Class<T> exceptionCauseClass) {
+        ExecutionException exception = assertThrows(ExecutionException.class, future::get);
+        assertTrue("Unexpected exception cause " + exception.getCause(),
+                exceptionCauseClass.isInstance(exception.getCause()));
+        return exceptionCauseClass.cast(exception.getCause());
+    }
+
     public static void assertFutureError(Future<?> future, Class<? extends Throwable>
exceptionClass)
         throws InterruptedException {
         try {


Mime
View raw message