kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10308: Fix flaky core/round_trip_fault_test.py (#9079)
Date Tue, 04 Aug 2020 13:57:18 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 67b8780  KAFKA-10308: Fix flaky core/round_trip_fault_test.py (#9079)
67b8780 is described below

commit 67b87807b21548c7b5c675284438429d9f486ca0
Author: Chia-Ping Tsai <chia7712@gmail.com>
AuthorDate: Tue Aug 4 21:53:10 2020 +0800

    KAFKA-10308: Fix flaky core/round_trip_fault_test.py (#9079)
    
    Creating a topic may fail (due to timeout) in running system tests. However, `RoundTripWorker`
does not ignore `TopicExistsException` which makes `round_trip_fault_test.py` be a flaky one.
    
    More specifically, a network exception can cause the `CreateTopics` request to reach Kafka
but Trogdor retry it
    and hit a `TopicAlreadyExists` exception on the retry, failing the test.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java   | 3 ++-
 .../main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java   | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index faf2d96..812129e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -259,7 +260,7 @@ public final class WorkerUtils {
             // map will always contain the topic since all topics in 'topicsExists' are in
given
             // 'topics' map
             int partitions = topicsInfo.get(desc.name()).numPartitions();
-            if (desc.partitions().size() != partitions) {
+            if (partitions != CreateTopicsRequest.NO_NUM_PARTITIONS && desc.partitions().size()
!= partitions) {
                 String str = "Topic '" + desc.name() + "' exists, but has "
                              + desc.partitions().size() + " partitions, while requested "
                              + " number of partitions is " + partitions;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 643d22c..643555a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -147,7 +147,7 @@ public class RoundTripWorker implements TaskWorker {
                 }
                 status.update(new TextNode("Creating " + newTopics.keySet().size() + " topic(s)"));
                 WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(),
-                    spec.adminClientConf(), newTopics, true);
+                    spec.adminClientConf(), newTopics, false);
                 status.update(new TextNode("Created " + newTopics.keySet().size() + " topic(s)"));
                 toSendTracker = new ToSendTracker(spec.maxMessages());
                 toReceiveTracker = new ToReceiveTracker();


Mime
View raw message