kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.7 updated: KAFKA-10559: Not letting TimeoutException shutdown the app during internal topic validation (#9432)
Date Thu, 15 Oct 2020 22:18:22 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 1c1f3ea  KAFKA-10559: Not letting TimeoutException shutdown the app during internal
topic validation (#9432)
1c1f3ea is described below

commit 1c1f3ea0b032850b1291390b6fc8928ef7195359
Author: vamossagar12 <sagarmeansocean@gmail.com>
AuthorDate: Fri Oct 16 03:40:21 2020 +0530

    KAFKA-10559: Not letting TimeoutException shutdown the app during internal topic validation
(#9432)
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../internals/StreamsPartitionAssignor.java        |  4 +-
 .../internals/StreamsPartitionAssignorTest.java    | 86 ----------------------
 2 files changed, 2 insertions(+), 88 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index d7df48f..d353e04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -345,7 +345,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions;
         try {
             allRepartitionTopicPartitions = prepareRepartitionTopics(topicGroups, metadata);
-        } catch (final TaskAssignmentException | TimeoutException e) {
+        } catch (final TaskAssignmentException e) {
             return new GroupAssignment(
                 errorAssignment(clientMetadataMap,
                     AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
@@ -376,7 +376,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         final boolean probingRebalanceNeeded;
         try {
             probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics,
topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
-        } catch (final TaskAssignmentException | TimeoutException e) {
+        } catch (final TaskAssignmentException e) {
             return new GroupAssignment(
                 errorAssignment(clientMetadataMap,
                     AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 43b7f3e..c1533f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
@@ -41,7 +40,6 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.TopologyWrapper;
-import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -51,7 +49,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
@@ -1184,89 +1181,6 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldReturnShutdownErrorCodeWhenCreatingRepartitionTopicsTimesOut() {
-        final StreamsBuilder streamsBuilder = new StreamsBuilder();
-        streamsBuilder.stream("topic1").repartition();
-
-        final String client = "client1";
-        builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
-
-        createDefaultMockTaskManager();
-        EasyMock.replay(taskManager);
-        partitionAssignor.configure(configProps());
-        final MockInternalTopicManager mockInternalTopicManager =  new MockInternalTopicManager(
-            time,
-            new StreamsConfig(configProps()),
-            mockClientSupplier.restoreConsumer,
-            false
-        ) {
-            @Override
-            public Set<String> makeReady(final Map<String, InternalTopicConfig>
topics) throws TaskAssignmentException {
-                throw new TimeoutException("KABOOM!");
-            }
-        };
-        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
-
-        subscriptions.put(client,
-            new Subscription(
-                singletonList("topic1"),
-                defaultSubscriptionInfo.encode()
-            )
-        );
-        final Map<String, Assignment> assignment =
-            partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
-
-        // check if we created a task for all expected topicPartitions.
-        assertThat(
-            AssignmentInfo.decode(assignment.get(client).userData()).errCode(),
-            equalTo(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
-        );
-    }
-
-    @Test
-    public void shouldReturnShutdownErrorCodeWhenCreatingChangelogTopicsTimesOut() {
-        final StreamsBuilder streamsBuilder = new StreamsBuilder();
-        streamsBuilder.table("topic1", Materialized.as("store"));
-
-        final String client = "client1";
-        builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
-
-        createDefaultMockTaskManager();
-        EasyMock.replay(taskManager);
-        partitionAssignor.configure(configProps());
-        final MockInternalTopicManager mockInternalTopicManager =  new MockInternalTopicManager(
-            time,
-            new StreamsConfig(configProps()),
-            mockClientSupplier.restoreConsumer,
-            false
-        ) {
-            @Override
-            public Set<String> makeReady(final Map<String, InternalTopicConfig>
topics) throws TaskAssignmentException {
-                if (topics.isEmpty()) {
-                    return emptySet();
-                }
-                throw new TimeoutException("KABOOM!");
-            }
-        };
-        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
-
-        subscriptions.put(client,
-            new Subscription(
-                singletonList("topic1"),
-                defaultSubscriptionInfo.encode()
-            )
-        );
-        final Map<String, Assignment> assignment =
-            partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
-
-        // check if we created a task for all expected topicPartitions.
-        assertThat(
-            AssignmentInfo.decode(assignment.get(client).userData()).errCode(),
-            equalTo(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
-        );
-    }
-
-    @Test
     public void shouldAddUserDefinedEndPointToSubscription() {
         builder.addSource(null, "source", null, null, null, "input");
         builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");


Mime
View raw message