kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9505: Only loop over topics-to-validate in retries (#8039)
Date Mon, 10 Feb 2020 21:00:32 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 78ec58a  KAFKA-9505: Only loop over topics-to-validate in retries (#8039)
78ec58a is described below

commit 78ec58af6bd80d4f798d6bc20b299d0bb476ccff
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Mon Feb 10 12:59:14 2020 -0800

    KAFKA-9505: Only loop over topics-to-validate in retries (#8039)
    
    Found this bug from the repeated flaky runs of system tests, it seems to be long lurking
but also would only happen if there are frequent rebalances / topic creation within a short
time, which is exactly the case in some of our smoke system tests.
    
    Also added a unit test.
    
    Reviewers: Boyang Chen <boyang@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 .../kafka/clients/admin/CreateTopicsResult.java    |  4 +-
 .../kafka/clients/admin/DescribeTopicsResult.java  |  2 +-
 .../kafka/clients/admin/KafkaAdminClient.java      |  2 +-
 .../processor/internals/InternalTopicManager.java  | 17 ++--
 .../internals/InternalTopicManagerTest.java        | 90 ++++++++++++++++++----
 5 files changed, 90 insertions(+), 25 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
index 85c0a90..eac086a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
@@ -35,7 +35,7 @@ public class CreateTopicsResult {
 
     private final Map<String, KafkaFuture<TopicMetadataAndConfig>> futures;
 
-    CreateTopicsResult(Map<String, KafkaFuture<TopicMetadataAndConfig>> futures)
{
+    protected CreateTopicsResult(Map<String, KafkaFuture<TopicMetadataAndConfig>>
futures) {
         this.futures = futures;
     }
 
@@ -94,7 +94,7 @@ public class CreateTopicsResult {
         return futures.get(topic).thenApply(TopicMetadataAndConfig::replicationFactor);
     }
 
-    static class TopicMetadataAndConfig {
+    public static class TopicMetadataAndConfig {
         private final ApiException exception;
         private final int numPartitions;
         private final int replicationFactor;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 9822b42..34698b9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
 public class DescribeTopicsResult {
     private final Map<String, KafkaFuture<TopicDescription>> futures;
 
-    DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> futures)
{
+    protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>>
futures) {
         this.futures = futures;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 52332f8..281a8c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1037,7 +1037,7 @@ public class KafkaAdminClient extends AdminClient {
                 }
                 ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder,
now,
                         true, requestTimeoutMs, null);
-                log.trace("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId());
+                log.debug("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId());
                 client.send(clientRequest, now);
                 getOrCreateListValue(callsInFlight, node.idString()).add(call);
                 correlationIdToCalls.put(clientRequest.correlationId(), call);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index ab21493..e6fb926 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -60,8 +60,7 @@ public class InternalTopicManager {
     private final int retries;
     private final long retryBackOffMs;
 
-    public InternalTopicManager(final Admin adminClient,
-                                final StreamsConfig streamsConfig) {
+    public InternalTopicManager(final Admin adminClient, final StreamsConfig streamsConfig)
{
         this.adminClient = adminClient;
 
         final LogContext logContext = new LogContext(String.format("stream-thread [%s] ",
Thread.currentThread().getName()));
@@ -202,7 +201,7 @@ public class InternalTopicManager {
                 if (cause instanceof UnknownTopicOrPartitionException ||
                     cause instanceof LeaderNotAvailableException) {
                     // This topic didn't exist or leader is not known yet, proceed to try
to create it
-                    log.debug("Topic {} is unknown or not found, hence not existed yet.",
topicName);
+                    log.debug("Topic {} is unknown or not found, hence not existed yet: {}",
topicName, cause.toString());
                 } else {
                     log.error("Unexpected error during topic description for {}.\n" +
                         "Error message was: {}", topicName, cause.toString());
@@ -217,15 +216,17 @@ public class InternalTopicManager {
     /**
      * Check the existing topics to have correct number of partitions; and return the remaining
topics that needs to be created
      */
-    private Set<String> validateTopics(final Set<String> topicsToValidate,
-                                       final Map<String, InternalTopicConfig> topicsMap)
{
+    private Set<String> validateTopics(final Set<String> topicsToValidate, final
Map<String, InternalTopicConfig> topicsMap) {
+        if (!topicsMap.keySet().containsAll(topicsToValidate)) {
+            throw new IllegalStateException("The topics map " + topicsMap.keySet() + " does
not contain all the topics " +
+                topicsToValidate + " trying to validate.");
+        }
 
         final Map<String, Integer> existedTopicPartition = getNumPartitions(topicsToValidate);
 
         final Set<String> topicsToCreate = new HashSet<>();
-        for (final Map.Entry<String, InternalTopicConfig> entry : topicsMap.entrySet())
{
-            final String topicName = entry.getKey();
-            final Optional<Integer> numberOfPartitions = entry.getValue().numberOfPartitions();
+        for (final String topicName : topicsToValidate) {
+            final Optional<Integer> numberOfPartitions = topicsMap.get(topicName).numberOfPartitions();
             if (existedTopicPartition.containsKey(topicName) && numberOfPartitions.isPresent())
{
                 if (!existedTopicPartition.get(topicName).equals(numberOfPartitions.get()))
{
                     final String errorMsg = String.format("Existing internal topic %s has
invalid partitions: " +
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 074228a..579e458 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -16,19 +16,28 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,6 +47,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -90,18 +100,18 @@ public class InternalTopicManagerTest {
         mockAdminClient.addTopic(
             false,
             topic,
-            Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList())),
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())),
             null);
         assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
     }
 
     @Test
     public void shouldCreateRequiredTopics() throws Exception {
-        final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic, Collections.<String,
String>emptyMap());
+        final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap());
         topicConfig.setNumberOfPartitions(1);
-        final InternalTopicConfig topicConfig2 = new UnwindowedChangelogTopicConfig(topic2,
Collections.<String, String>emptyMap());
+        final InternalTopicConfig topicConfig2 = new UnwindowedChangelogTopicConfig(topic2,
Collections.emptyMap());
         topicConfig2.setNumberOfPartitions(1);
-        final InternalTopicConfig topicConfig3 = new WindowedChangelogTopicConfig(topic3,
Collections.<String, String>emptyMap());
+        final InternalTopicConfig topicConfig3 = new WindowedChangelogTopicConfig(topic3,
Collections.emptyMap());
         topicConfig3.setNumberOfPartitions(1);
 
         internalTopicManager.makeReady(Collections.singletonMap(topic, topicConfig));
@@ -111,17 +121,17 @@ public class InternalTopicManagerTest {
         assertEquals(Utils.mkSet(topic, topic2, topic3), mockAdminClient.listTopics().names().get());
         assertEquals(new TopicDescription(topic, false, new ArrayList<TopicPartitionInfo>()
{
             {
-                add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
+                add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
             }
         }), mockAdminClient.describeTopics(Collections.singleton(topic)).values().get(topic).get());
         assertEquals(new TopicDescription(topic2, false, new ArrayList<TopicPartitionInfo>()
{
             {
-                add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
+                add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
             }
         }), mockAdminClient.describeTopics(Collections.singleton(topic2)).values().get(topic2).get());
         assertEquals(new TopicDescription(topic3, false, new ArrayList<TopicPartitionInfo>()
{
             {
-                add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
+                add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
             }
         }), mockAdminClient.describeTopics(Collections.singleton(topic3)).values().get(topic3).get());
 
@@ -132,7 +142,49 @@ public class InternalTopicManagerTest {
         assertEquals(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
mockAdminClient.describeConfigs(Collections.singleton(resource)).values().get(resource).get().get(TopicConfig.CLEANUP_POLICY_CONFIG));
         assertEquals(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT),
mockAdminClient.describeConfigs(Collections.singleton(resource2)).values().get(resource2).get().get(TopicConfig.CLEANUP_POLICY_CONFIG));
         assertEquals(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT
+ "," + TopicConfig.CLEANUP_POLICY_DELETE), mockAdminClient.describeConfigs(Collections.singleton(resource3)).values().get(resource3).get().get(TopicConfig.CLEANUP_POLICY_CONFIG));
+    }
 
+    @Test
+    public void shouldCompleteTopicValidationOnRetry() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+        final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1,
+            Collections.singletonList(broker1), Collections.singletonList(broker1));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessFuture = new
KafkaFutureImpl<>();
+        final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();
+        topicDescriptionSuccessFuture.complete(new TopicDescription(topic, false, Collections.singletonList(partitionInfo),
Collections.emptySet()));
+        topicDescriptionFailFuture.completeExceptionally(new UnknownTopicOrPartitionException("KABOOM!"));
+
+        final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture
= new KafkaFutureImpl<>();
+        topicCreationFuture.completeExceptionally(new TopicExistsException("KABOOM!"));
+
+        // let the first describe succeed on topic, and fail on topic2, and then let creation
throws topics-existed;
+        // it should retry with just topic2 and then let it succeed
+        EasyMock.expect(admin.describeTopics(Utils.mkSet(topic, topic2)))
+            .andReturn(new MockDescribeTopicsResult(Utils.mkMap(
+                Utils.mkEntry(topic, topicDescriptionSuccessFuture),
+                Utils.mkEntry(topic2, topicDescriptionFailFuture)
+            ))).once();
+        EasyMock.expect(admin.createTopics(Collections.singleton(new NewTopic(topic2, Optional.of(1),
Optional.of((short) 1))
+            .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT),
+                                 Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
"CreateTime"))))))
+            .andReturn(new MockCreateTopicsResult(Collections.singletonMap(topic2, topicCreationFuture))).once();
+        EasyMock.expect(admin.describeTopics(Collections.singleton(topic2)))
+            .andReturn(new MockDescribeTopicsResult(Collections.singletonMap(topic2, topicDescriptionSuccessFuture)));
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig topicConfig = new UnwindowedChangelogTopicConfig(topic,
Collections.emptyMap());
+        topicConfig.setNumberOfPartitions(1);
+        final InternalTopicConfig topic2Config = new UnwindowedChangelogTopicConfig(topic2,
Collections.emptyMap());
+        topic2Config.setNumberOfPartitions(1);
+        topicManager.makeReady(Utils.mkMap(
+            Utils.mkEntry(topic, topicConfig),
+            Utils.mkEntry(topic2, topic2Config)
+        ));
+
+        EasyMock.verify(admin);
     }
 
     @Test
@@ -142,14 +194,14 @@ public class InternalTopicManagerTest {
             topic,
             new ArrayList<TopicPartitionInfo>() {
                 {
-                    add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
-                    add(new TopicPartitionInfo(1, broker1, singleReplica, Collections.<Node>emptyList()));
+                    add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
+                    add(new TopicPartitionInfo(1, broker1, singleReplica, Collections.emptyList()));
                 }
             },
             null);
 
         try {
-            final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic,
Collections.<String, String>emptyMap());
+            final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic,
Collections.emptyMap());
             internalTopicConfig.setNumberOfPartitions(1);
             internalTopicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig));
             fail("Should have thrown StreamsException");
@@ -161,7 +213,7 @@ public class InternalTopicManagerTest {
         mockAdminClient.addTopic(
             false,
             topic,
-            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.<Node>emptyList())),
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
             null);
 
         // attempt to create it again with replication 1
@@ -213,11 +265,10 @@ public class InternalTopicManagerTest {
         topicConfigMap.put(topic, internalTopicConfig);
         topicConfigMap.put("internal-topic", internalTopicConfigII);
 
-
         internalTopicManager.makeReady(topicConfigMap);
         boolean foundExpectedMessage = false;
         for (final String message : appender.getMessages()) {
-            foundExpectedMessage |= message.contains("Topic internal-topic is unknown or
not found, hence not existed yet.");
+            foundExpectedMessage |= message.contains("Topic internal-topic is unknown or
not found, hence not existed yet");
         }
         assertTrue(foundExpectedMessage);
 
@@ -243,4 +294,17 @@ public class InternalTopicManagerTest {
         }
     }
 
+    private class MockCreateTopicsResult extends CreateTopicsResult {
+
+        MockCreateTopicsResult(final Map<String, KafkaFuture<TopicMetadataAndConfig>>
futures) {
+            super(futures);
+        }
+    }
+
+    private class MockDescribeTopicsResult extends DescribeTopicsResult {
+
+        MockDescribeTopicsResult(final Map<String, KafkaFuture<TopicDescription>>
futures) {
+            super(futures);
+        }
+    }
 }


Mime
View raw message