kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-6928: Refactor StreamsPartitionAssignor retry logic (#6085)
Date Fri, 04 Jan 2019 18:49:54 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new e501f52  KAFKA-6928: Refactor StreamsPartitionAssignor retry logic (#6085)
e501f52 is described below

commit e501f5273032839d5e4b580a9fcef4fac0188970
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Fri Jan 4 10:49:08 2019 -0800

    KAFKA-6928: Refactor StreamsPartitionAssignor retry logic (#6085)
    
    1. The retry loop of the InternalTopicManager would just be: a) describe topics, and exclude
those which already exist with the right num.partitions, b) for the remaining topics, try
to create them. Remove any inner loops.
    
    2. In CreateTopicResponse and MetadataResponse (for describe topic), handle the special
error code of TopicExist and UnknownTopicOrPartition in order to retry in the next loop.
    
    3. Do not handle TimeoutException since it should already been handled inside AdminClient.
    
    Add corresponding unit tests for a) topic marked for deletion but not complete yet, in
which case metadata response would not contain this topic, but create topic would return error
TopicExists; b) request keep getting timed out.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>
---
 .../kafka/clients/admin/MockAdminClient.java       |  17 +-
 .../processor/internals/InternalTopicManager.java  | 219 ++++++++++-----------
 .../internals/InternalTopicManagerTest.java        |  31 ++-
 3 files changed, 140 insertions(+), 127 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index b5131ae..9fe1ba4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -108,6 +108,14 @@ public class MockAdminClient extends AdminClient {
         allTopics.put(name, new TopicMetadata(internal, partitions, configs));
     }
 
+    public void markTopicForDeletion(final String name) {
+        if (!allTopics.containsKey(name)) {
+            throw new IllegalArgumentException(String.format("Topic %s did not exist.", name));
+        }
+
+        allTopics.get(name).markedForDeletion = true;
+    }
+
     public void timeoutNextRequest(int numberOfRequest) {
         timeoutNextRequests = numberOfRequest;
     }
@@ -167,7 +175,7 @@ public class MockAdminClient extends AdminClient {
             int numberOfPartitions = newTopic.numPartitions();
             List<TopicPartitionInfo> partitions = new ArrayList<>(numberOfPartitions);
             for (int p = 0; p < numberOfPartitions; ++p) {
-                partitions.add(new TopicPartitionInfo(p, brokers.get(0), replicas, Collections.<Node>emptyList()));
+                partitions.add(new TopicPartitionInfo(p, brokers.get(0), replicas, Collections.emptyList()));
             }
             allTopics.put(topicName, new TopicMetadata(false, partitions, newTopic.configs()));
             future.complete(null);
@@ -217,7 +225,7 @@ public class MockAdminClient extends AdminClient {
         for (String requestedTopic : topicNames) {
             for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet())
{
                 String topicName = topicDescription.getKey();
-                if (topicName.equals(requestedTopic)) {
+                if (topicName.equals(requestedTopic) && !topicDescription.getValue().markedForDeletion)
{
                     TopicMetadata topicMetadata = topicDescription.getValue();
                     KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
                     future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic,
topicMetadata.partitions));
@@ -386,12 +394,15 @@ public class MockAdminClient extends AdminClient {
         final List<TopicPartitionInfo> partitions;
         final Map<String, String> configs;
 
+        public boolean markedForDeletion;
+
         TopicMetadata(boolean isInternalTopic,
                       List<TopicPartitionInfo> partitions,
                       Map<String, String> configs) {
             this.isInternalTopic = isInternalTopic;
             this.partitions = partitions;
-            this.configs = configs != null ? configs : Collections.<String, String>emptyMap();
+            this.configs = configs != null ? configs : Collections.emptyMap();
+            this.markedForDeletion = false;
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 7e35126..40c25d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -23,16 +23,15 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -95,173 +94,155 @@ public class InternalTopicManager {
      * If a topic exists already but has different number of partitions we fail and throw
exception requesting user to reset the app before restarting again.
      */
     public void makeReady(final Map<String, InternalTopicConfig> topics) {
-        final Map<String, Integer> existingTopicPartitions = getNumPartitions(topics.keySet());
-        final Set<InternalTopicConfig> topicsToBeCreated = validateTopicPartitions(topics.values(),
existingTopicPartitions);
-        if (topicsToBeCreated.size() > 0) {
-            final Set<NewTopic> newTopics = new HashSet<>();
+        // we will do the validation / topic-creation in a loop, until we have confirmed
all topics
+        // have existed with the expected number of partitions, or some create topic returns
fatal errors.
 
-            for (final InternalTopicConfig internalTopicConfig : topicsToBeCreated) {
-                final Map<String, String> topicConfig = internalTopicConfig.getProperties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
+        int remainingRetries = retries;
+        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
 
-                log.debug("Going to create topic {} with {} partitions and config {}.",
-                        internalTopicConfig.name(),
-                        internalTopicConfig.numberOfPartitions(),
-                        topicConfig);
+        while (!topicsNotReady.isEmpty() && remainingRetries >= 0) {
+            topicsNotReady = validateTopics(topicsNotReady, topics);
+
+            if (topicsNotReady.size() > 0) {
+                final Set<NewTopic> newTopics = new HashSet<>();
 
-                newTopics.add(
-                    new NewTopic(
+                for (final String topicName : topicsNotReady) {
+                    final InternalTopicConfig internalTopicConfig = Utils.notNull(topics.get(topicName));
+                    final Map<String, String> topicConfig = internalTopicConfig.getProperties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
+
+                    log.debug("Going to create topic {} with {} partitions and config {}.",
                         internalTopicConfig.name(),
                         internalTopicConfig.numberOfPartitions(),
-                        replicationFactor)
-                    .configs(topicConfig));
-            }
+                        topicConfig);
 
-            // TODO: KAFKA-6928. should not need retries in the outer caller as it will be
retried internally in admin client
-            int remainingRetries = retries;
-            boolean retryBackOff = false;
-            boolean retry;
-            do {
-                retry = false;
+                    newTopics.add(
+                        new NewTopic(
+                            internalTopicConfig.name(),
+                            internalTopicConfig.numberOfPartitions(),
+                            replicationFactor)
+                            .configs(topicConfig));
+                }
 
                 final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics);
 
-                final Set<String> createdTopicNames = new HashSet<>();
                 for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult
: createTopicsResult.values().entrySet()) {
+                    final String topicName = createTopicResult.getKey();
                     try {
-                        if (retryBackOff) {
-                            retryBackOff = false;
-                            Thread.sleep(retryBackOffMs);
-                        }
                         createTopicResult.getValue().get();
-                        createdTopicNames.add(createTopicResult.getKey());
-                    } catch (final ExecutionException couldNotCreateTopic) {
-                        final Throwable cause = couldNotCreateTopic.getCause();
-                        final String topicName = createTopicResult.getKey();
-
-                        if (cause instanceof TimeoutException) {
-                            retry = true;
-                            log.debug("Could not get number of partitions for topic {} due
to timeout. " +
-                                "Will try again (remaining retries {}).", topicName, remainingRetries
- 1);
-                        } else if (cause instanceof TopicExistsException) {
-                            // This topic didn't exist earlier, it might be marked for deletion
or it might differ
-                            // from the desired setup. It needs re-validation.
-                            final Map<String, Integer> existingTopicPartition = getNumPartitions(Collections.singleton(topicName));
-
-                            if (existingTopicPartition.containsKey(topicName)
-                                    && validateTopicPartitions(Collections.singleton(topics.get(topicName)),
existingTopicPartition).isEmpty()) {
-                                createdTopicNames.add(createTopicResult.getKey());
-                                log.info("Topic {} exists already and has the right number
of partitions: {}",
-                                        topicName,
-                                        couldNotCreateTopic.toString());
-                            } else {
-                                retry = true;
-                                retryBackOff = true;
-                                log.info("Could not create topic {}. Topic is probably marked
for deletion (number of partitions is unknown).\n" +
-                                        "Will retry to create this topic in {} ms (to let
broker finish async delete operation first).\n" +
-                                        "Error message was: {}", topicName, retryBackOffMs,
couldNotCreateTopic.toString());
-                            }
-                        } else {
-                            throw new StreamsException(String.format("Could not create topic
%s.", topicName),
-                                couldNotCreateTopic);
-                        }
+                        topicsNotReady.remove(topicName);
                     } catch (final InterruptedException fatalException) {
+                        // this should not happen; if it ever happens it indicate a bug
                         Thread.currentThread().interrupt();
                         log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
                         throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
+                    } catch (final ExecutionException executionException) {
+                        final Throwable cause = executionException.getCause();
+                        if (cause instanceof TopicExistsException) {
+                            // This topic didn't exist earlier or its leader not known before;
just retain it for next round of validation.
+                            log.info("Could not create topic {}. Topic is probably marked
for deletion (number of partitions is unknown).\n" +
+                                "Will retry to create this topic in {} ms (to let broker
finish async delete operation first).\n" +
+                                "Error message was: {}", topicName, retryBackOffMs, cause.toString());
+                        } else {
+                            log.error("Unexpected error during topic creation for {}.\n"
+
+                                "Error message was: {}", topicName, cause.toString());
+                            throw new StreamsException(String.format("Could not create topic
%s.", topicName), cause);
+                        }
                     }
                 }
+            }
 
-                if (retry) {
-                    newTopics.removeIf(newTopic -> createdTopicNames.contains(newTopic.name()));
 
-                    continue;
+            if (!topicsNotReady.isEmpty()) {
+                log.info("Topics {} can not be made ready with {} retries left", topicsNotReady,
retries);
+
+                try {
+                    Thread.sleep(retryBackOffMs);
+                } catch (final InterruptedException e) {
+                    // this is okay, we just wake up early
+                    Thread.currentThread().interrupt();
                 }
 
-                return;
-            } while (remainingRetries-- > 0);
+                remainingRetries--;
+            }
+        }
 
-            final String timeoutAndRetryError = "Could not create topics. " +
+        if (!topicsNotReady.isEmpty()) {
+            final String timeoutAndRetryError = String.format("Could not create topics after
%d retries. " +
                 "This can happen if the Kafka cluster is temporary not available. " +
-                "You can increase admin client config `retries` to be resilient against this
error.";
+                "You can increase admin client config `retries` to be resilient against this
error.", retries);
             log.error(timeoutAndRetryError);
             throw new StreamsException(timeoutAndRetryError);
         }
     }
 
     /**
-     * Get the number of partitions for the given topics
+     * Try to get the number of partitions for the given topics; return the number of partitions
for topics that already exists.
+     *
+     * Topics that were not able to get its description will simply not be returned
      */
     // visible for testing
     protected Map<String, Integer> getNumPartitions(final Set<String> topics)
{
         log.debug("Trying to check if topics {} have been created with expected number of
partitions.", topics);
 
-        // TODO: KAFKA-6928. should not need retries in the outer caller as it will be retried
internally in admin client
-        int remainingRetries = retries;
-        boolean retry;
-        do {
-            retry = false;
-
-            final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
-            final Map<String, KafkaFuture<TopicDescription>> futures = describeTopicsResult.values();
-
-            final Map<String, Integer> existingNumberOfPartitionsPerTopic = new HashMap<>();
-            for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture
: futures.entrySet()) {
-                try {
-                    final TopicDescription topicDescription = topicFuture.getValue().get();
-                    existingNumberOfPartitionsPerTopic.put(
-                        topicFuture.getKey(),
-                        topicDescription.partitions().size());
-                } catch (final InterruptedException fatalException) {
-                    Thread.currentThread().interrupt();
-                    log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
-                    throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
-                } catch (final ExecutionException couldNotDescribeTopicException) {
-                    final Throwable cause = couldNotDescribeTopicException.getCause();
-                    if (cause instanceof TimeoutException) {
-                        retry = true;
-                        log.debug("Could not get number of partitions for topic {} due to
timeout. " +
-                            "Will try again (remaining retries {}).", topicFuture.getKey(),
remainingRetries - 1);
-                    } else {
-                        final String error = "Could not get number of partitions for topic
{} due to {}";
-                        log.debug(error, topicFuture.getKey(), cause.toString());
-                    }
+        final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
+        final Map<String, KafkaFuture<TopicDescription>> futures = describeTopicsResult.values();
+
+        final Map<String, Integer> existedTopicPartition = new HashMap<>();
+        for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture
: futures.entrySet()) {
+            final String topicName = topicFuture.getKey();
+            try {
+                final TopicDescription topicDescription = topicFuture.getValue().get();
+                existedTopicPartition.put(
+                    topicFuture.getKey(),
+                    topicDescription.partitions().size());
+            } catch (final InterruptedException fatalException) {
+                // this should not happen; if it ever happens it indicate a bug
+                Thread.currentThread().interrupt();
+                log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
+                throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
+            } catch (final ExecutionException couldNotDescribeTopicException) {
+                final Throwable cause = couldNotDescribeTopicException.getCause();
+                if (cause instanceof UnknownTopicOrPartitionException ||
+                    cause instanceof LeaderNotAvailableException) {
+                    // This topic didn't exist or leader is not known yet, proceed to try
to create it
+                    log.debug("Topic {} is unknown, hence not existed yet.", topicName);
+                } else {
+                    log.error("Unexpected error during topic description for {}.\n" +
+                        "Error message was: {}", topicName, cause.toString());
+                    throw new StreamsException(String.format("Could not create topic %s.",
topicName), cause);
                 }
             }
+        }
 
-            if (retry) {
-                topics.removeAll(existingNumberOfPartitionsPerTopic.keySet());
-                continue;
-            }
-
-            return existingNumberOfPartitionsPerTopic;
-        } while (remainingRetries-- > 0);
-
-        return Collections.emptyMap();
+        return existedTopicPartition;
     }
 
     /**
-     * Check the existing topics to have correct number of partitions; and return the non
existing topics to be created
+     * Check the existing topics to have correct number of partitions; and return the remaining
topics that needs to be created
      */
-    private Set<InternalTopicConfig> validateTopicPartitions(final Collection<InternalTopicConfig>
topicsPartitionsMap,
-                                                             final Map<String, Integer>
existingTopicNamesPartitions) {
-        final Set<InternalTopicConfig> topicsToBeCreated = new HashSet<>();
-        for (final InternalTopicConfig topic : topicsPartitionsMap) {
-            final int numberOfPartitions = topic.numberOfPartitions();
-            if (existingTopicNamesPartitions.containsKey(topic.name())) {
-                if (!existingTopicNamesPartitions.get(topic.name()).equals(numberOfPartitions))
{
+    private Set<String> validateTopics(final Set<String> topicsToValidate,
+                                       final Map<String, InternalTopicConfig> topicsMap)
{
+
+        final Map<String, Integer> existedTopicPartition = getNumPartitions(topicsToValidate);
+
+        final Set<String> topicsToCreate = new HashSet<>();
+        for (final Map.Entry<String, InternalTopicConfig> entry : topicsMap.entrySet())
{
+            final String topicName = entry.getKey();
+            final int numberOfPartitions = entry.getValue().numberOfPartitions();
+            if (existedTopicPartition.containsKey(topicName)) {
+                if (!existedTopicPartition.get(topicName).equals(numberOfPartitions)) {
                     final String errorMsg = String.format("Existing internal topic %s has
invalid partitions: " +
                             "expected: %d; actual: %d. " +
                             "Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics
before processing.",
-                        topic.name(), numberOfPartitions, existingTopicNamesPartitions.get(topic.name()));
+                        topicName, numberOfPartitions, existedTopicPartition.get(topicName));
                     log.error(errorMsg);
                     throw new StreamsException(errorMsg);
                 }
             } else {
-                topicsToBeCreated.add(topic);
+                topicsToCreate.add(topicName);
             }
         }
 
-        return topicsToBeCreated;
+        return topicsToCreate;
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 344dc05..e91bf32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -39,6 +40,7 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class InternalTopicManagerTest {
@@ -166,28 +168,47 @@ public class InternalTopicManagerTest {
             mockAdminClient,
             new StreamsConfig(config));
 
-        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic,
Collections.<String, String>emptyMap());
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic,
Collections.emptyMap());
         internalTopicConfig.setNumberOfPartitions(1);
         internalTopicManager2.makeReady(Collections.singletonMap(topic, internalTopicConfig));
     }
 
     @Test
     public void shouldNotThrowExceptionForEmptyTopicMap() {
-        internalTopicManager.makeReady(Collections.<String, InternalTopicConfig>emptyMap());
+        internalTopicManager.makeReady(Collections.emptyMap());
     }
 
     @Test
     public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
-        mockAdminClient.timeoutNextRequest(4);
+        mockAdminClient.timeoutNextRequest(1);
 
-        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic,
Collections.<String, String>emptyMap());
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic,
Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        try {
+            internalTopicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig));
+            fail("Should have thrown StreamsException.");
+        } catch (final StreamsException expected) {
+            assertEquals(TimeoutException.class, expected.getCause().getClass());
+        }
+    }
+
+    @Test
+    public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
+        mockAdminClient.addTopic(
+            false,
+            topic,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
+            null);
+        mockAdminClient.markTopicForDeletion(topic);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic,
Collections.emptyMap());
         internalTopicConfig.setNumberOfPartitions(1);
         try {
             internalTopicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig));
             fail("Should have thrown StreamsException.");
         } catch (final StreamsException expected) {
             assertNull(expected.getCause());
-            assertEquals("Could not create topics. This can happen if the Kafka cluster is
temporary not available. You can increase admin client config `retries` to be resilient against
this error.", expected.getMessage());
+            assertTrue(expected.getMessage().startsWith("Could not create topics after 1
retries"));
         }
     }
 


Mime
View raw message