kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7655 Metadata spamming requests from Kafka Streams under some circumstances, potential DOS (#5929)
Date Thu, 13 Dec 2018 16:30:46 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 5d5da52  KAFKA-7655 Metadata spamming requests from Kafka Streams under some circumstances,
potential DOS (#5929)
5d5da52 is described below

commit 5d5da52bd04d9c4c147c3e51bba616bd111be415
Author: Pasquale Vazzana <pasqualevazzana@gmail.com>
AuthorDate: Thu Dec 13 15:40:39 2018 +0000

    KAFKA-7655 Metadata spamming requests from Kafka Streams under some circumstances, potential
DOS (#5929)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |  1 +
 .../processor/internals/InternalTopicManager.java  | 34 +++++++++++++++++-----
 .../apache/kafka/streams/StreamsConfigTest.java    |  5 +++-
 3 files changed, 32 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b8963a8..5b76a18 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -984,6 +984,7 @@ public class StreamsConfig extends AbstractConfig {
         // add admin retries configs for creating topics
         final AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX,
AdminClientConfig.configNames()));
         consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG));
+        consumerProps.put(adminClientPrefix(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), adminClientDefaultConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG));
 
         // verify that producer batch config is no larger than segment size, then add topic
configs required for creating topics
         final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 2c2df04..91a9ff2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -52,6 +52,7 @@ public class InternalTopicManager {
     private final AdminClient adminClient;
 
     private final int retries;
+    private final long retryBackOffMs;
 
     public InternalTopicManager(final AdminClient adminClient,
                                 final StreamsConfig streamsConfig) {
@@ -63,6 +64,7 @@ public class InternalTopicManager {
         replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
         windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
         retries = new AdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);
+        retryBackOffMs = new AdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
 
         log.debug("Configs:" + Utils.NL,
             "\t{} = {}" + Utils.NL,
@@ -110,17 +112,22 @@ public class InternalTopicManager {
 
             // TODO: KAFKA-6928. should not need retries in the outer caller as it will be
retried internally in admin client
             int remainingRetries = retries;
+            boolean retryBackOff = false;
             boolean retry;
             do {
                 retry = false;
 
                 final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics);
 
-                final Set<String> createTopicNames = new HashSet<>();
+                final Set<String> createdTopicNames = new HashSet<>();
                 for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult
: createTopicsResult.values().entrySet()) {
                     try {
+                        if (retryBackOff) {
+                            retryBackOff = false;
+                            Thread.sleep(retryBackOffMs);
+                        }
                         createTopicResult.getValue().get();
-                        createTopicNames.add(createTopicResult.getKey());
+                        createdTopicNames.add(createTopicResult.getKey());
                     } catch (final ExecutionException couldNotCreateTopic) {
                         final Throwable cause = couldNotCreateTopic.getCause();
                         final String topicName = createTopicResult.getKey();
@@ -130,10 +137,23 @@ public class InternalTopicManager {
                             log.debug("Could not get number of partitions for topic {} due
to timeout. " +
                                 "Will try again (remaining retries {}).", topicName, remainingRetries
- 1);
                         } else if (cause instanceof TopicExistsException) {
-                            createTopicNames.add(createTopicResult.getKey());
-                            log.info("Topic {} exist already: {}",
-                                topicName,
-                                couldNotCreateTopic.toString());
+                            // This topic didn't exist earlier, it might be marked for deletion
or it might differ
+                            // from the desired setup. It needs re-validation.
+                            final Map<String, Integer> existingTopicPartition = getNumPartitions(Collections.singleton(topicName));
+
+                            if (existingTopicPartition.containsKey(topicName)
+                                    && validateTopicPartitions(Collections.singleton(topics.get(topicName)),
existingTopicPartition).isEmpty()) {
+                                createdTopicNames.add(createTopicResult.getKey());
+                                log.info("Topic {} exists already and has the right number
of partitions: {}",
+                                        topicName,
+                                        couldNotCreateTopic.toString());
+                            } else {
+                                retry = true;
+                                retryBackOff = true;
+                                log.info("Could not create topic {}. Topic is probably marked
for deletion (number of partitions is unknown).\n" +
+                                        "Will retry to create this topic in {} ms (to let
broker finish async delete operation first).\n" +
+                                        "Error message was: {}", topicName, retryBackOffMs,
couldNotCreateTopic.toString());
+                            }
                         } else {
                             throw new StreamsException(String.format("Could not create topic
%s.", topicName),
                                 couldNotCreateTopic);
@@ -148,7 +168,7 @@ public class InternalTopicManager {
                 if (retry) {
                     final Iterator<NewTopic> it = newTopics.iterator();
                     while (it.hasNext()) {
-                        if (createTopicNames.contains(it.next().name())) {
+                        if (createdTopicNames.contains(it.next().name())) {
                             it.remove();
                         }
                     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3dceaf6..7a837c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -129,9 +129,11 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void consumerConfigMustUseAdminClientConfigForRetries() {
+    public void consumerConfigShouldContainAdminClientConfigsForRetriesAndRetryBackOffMsWithAdminPrefix()
{
         props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 20);
+        props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
200L);
         props.put(StreamsConfig.RETRIES_CONFIG, 10);
+        props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 100L);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
         final String groupId = "example-application";
@@ -139,6 +141,7 @@ public class StreamsConfigTest {
         final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId,
clientId);
 
         assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
+        assertEquals(200L, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG)));
     }
 
     @Test


Mime
View raw message