kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-6126: Remove unnecessary topics created check
Date Thu, 21 Dec 2017 02:02:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9cacb92d1 -> dca1474b4


KAFKA-6126: Remove unnecessary topics created check

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4322 from mjsax/kafka-6126-remove-topic-check-on-rebalance-2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dca1474b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dca1474b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dca1474b

Branch: refs/heads/trunk
Commit: dca1474b4b33b85dba461c61568b86b17d6be18f
Parents: 9cacb92
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Dec 20 18:02:33 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Dec 20 18:02:33 2017 -0800

----------------------------------------------------------------------
 .../internals/InternalTopicManager.java         | 27 +++----------
 .../internals/StreamPartitionAssignor.java      | 21 ----------
 .../internals/InternalTopicManagerTest.java     | 40 +-------------------
 .../kafka/test/MockInternalTopicManager.java    |  2 +-
 4 files changed, 7 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dca1474b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index f093d83..05d079b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -87,7 +87,7 @@ public class InternalTopicManager {
      * If a topic exists already but has different number of partitions we fail and throw
exception requesting user to reset the app before restarting again.
      */
     public void makeReady(final Map<String, InternalTopicConfig> topics) {
-        final Map<String, Integer> existingTopicPartitions = getNumPartitions(topics.keySet(),
true);
+        final Map<String, Integer> existingTopicPartitions = getNumPartitions(topics.keySet());
         final Set<InternalTopicConfig> topicsToBeCreated = validateTopicPartitions(topics.values(),
existingTopicPartitions);
         if (topicsToBeCreated.size() > 0) {
             final Set<NewTopic> newTopics = new HashSet<>();
@@ -169,12 +169,8 @@ public class InternalTopicManager {
     /**
      * Get the number of partitions for the given topics
      */
-    public Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        return getNumPartitions(topics, false);
-    }
-
-    private Map<String, Integer> getNumPartitions(final Set<String> topics,
-                                                  final boolean bestEffort) {
+    // visible for testing
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics)
{
         int remainingRetries = retries;
         boolean retry;
         do {
@@ -202,12 +198,7 @@ public class InternalTopicManager {
                             "Will try again (remaining retries {}).", topicFuture.getKey(),
remainingRetries - 1);
                     } else {
                         final String error = "Could not get number of partitions for topic
{}.";
-                        if (bestEffort) {
-                            log.debug(error, topicFuture.getKey(), cause.getMessage());
-                        } else {
-                            log.error(error, topicFuture.getKey(), cause);
-                            throw new StreamsException(cause);
-                        }
+                        log.debug(error, topicFuture.getKey(), cause.getMessage());
                     }
                 }
             }
@@ -220,15 +211,7 @@ public class InternalTopicManager {
             return existingNumberOfPartitionsPerTopic;
         } while (remainingRetries-- > 0);
 
-        if (bestEffort) {
-            return Collections.emptyMap();
-        }
-
-        final String timeoutAndRetryError = "Could not get number of partitions from brokers.
" +
-            "This can happen if the Kafka cluster is temporary not available. " +
-            "You can increase admin client config `retries` to be resilient against this
error.";
-        log.error(timeoutAndRetryError);
-        throw new StreamsException(timeoutAndRetryError);
+        return Collections.emptyMap();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/dca1474b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index d5a1fe6..ef9ca3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -631,32 +631,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         if (!topicsToMakeReady.isEmpty()) {
             internalTopicManager.makeReady(topicsToMakeReady);
-
-            // wait until each one of the topic metadata has been propagated to at least
one broker
-            while (!allTopicsCreated(topicsToMakeReady)) {
-                try {
-                    Thread.sleep(50L);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    // ignore
-                }
-            }
         }
 
         log.debug("Completed validating internal topics in partition assignor.");
     }
 
-    private boolean allTopicsCreated(final Map<String, InternalTopicConfig> topicsToMakeReady)
{
-        final Map<String, Integer> partitions = internalTopicManager.getNumPartitions(topicsToMakeReady.keySet());
-        for (final InternalTopicConfig topic : topicsToMakeReady.values()) {
-            final Integer numPartitions = partitions.get(topic.name());
-            if (numPartitions == null || !numPartitions.equals(topic.numberOfPartitions()))
{
-                return false;
-            }
-        }
-        return true;
-    }
-
     private void ensureCopartitioning(Collection<Set<String>> copartitionGroups,
                                       Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
                                       Cluster metadata) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dca1474b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 3210b23..d9189d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -32,17 +31,14 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class InternalTopicManagerTest {
@@ -82,7 +78,7 @@ public class InternalTopicManagerTest {
     }
 
     @After
-    public void shutdown() throws IOException {
+    public void shutdown() {
         mockAdminClient.close();
     }
 
@@ -97,40 +93,6 @@ public class InternalTopicManagerTest {
     }
 
     @Test
-    public void shouldFailWithUnknownTopicException() {
-        mockAdminClient.addTopic(
-            false,
-            topic,
-            Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList())),
-            null);
-
-        try {
-            internalTopicManager.getNumPartitions(new HashSet<String>() {
-                {
-                    add(topic);
-                    add(topic2);
-                }
-            });
-            fail("Should have thrown UnknownTopicOrPartitionException.");
-        } catch (final StreamsException expected) {
-            assertTrue(expected.getCause() instanceof UnknownTopicOrPartitionException);
-        }
-    }
-
-    @Test
-    public void shouldExhaustRetriesOnTimeoutExceptionForGetNumPartitions() {
-        mockAdminClient.timeoutNextRequest(2);
-
-        try {
-            internalTopicManager.getNumPartitions(Collections.singleton(topic));
-            fail("Should have thrown StreamsException.");
-        } catch (final StreamsException expected) {
-            assertNull(expected.getCause());
-            assertEquals("Could not get number of partitions from brokers. This can happen
if the Kafka cluster is temporary not available. You can increase admin client config `retries`
to be resilient against this error.", expected.getMessage());
-        }
-    }
-
-    @Test
     public void shouldCreateRequiredTopics() throws Exception {
         final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic, Collections.<String,
String>emptyMap());
         topicConfig.setNumberOfPartitions(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dca1474b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
index ae73280..ea27045 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
@@ -60,7 +60,7 @@ public class MockInternalTopicManager extends InternalTopicManager {
     }
 
     @Override
-    public Map<String, Integer> getNumPartitions(final Set<String> topics) {
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics)
{
         final Map<String, Integer> partitions = new HashMap<>();
         for (String topic : topics) {
             partitions.put(topic, restoreConsumer.partitionsFor(topic) == null ?  null :
restoreConsumer.partitionsFor(topic).size());


Mime
View raw message