kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7195: Fix StreamStreamJoinIntegrationTest test failures (#5418)
Date Tue, 24 Jul 2018 09:29:12 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new a9c50b2  KAFKA-7195: Fix StreamStreamJoinIntegrationTest test failures (#5418)
a9c50b2 is described below

commit a9c50b29f1727becdda738a62b9241ca72fdf0d3
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Tue Jul 24 14:59:01 2018 +0530

    KAFKA-7195: Fix StreamStreamJoinIntegrationTest test failures (#5418)
---
 .../integration/AbstractJoinIntegrationTest.java   |  2 +-
 .../integration/utils/EmbeddedKafkaCluster.java    | 22 ++++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 80ab606..3e29fc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -163,7 +163,7 @@ public abstract class AbstractJoinIntegrationTest {
 
     @After
     public void cleanup() throws InterruptedException {
-        CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
+        CLUSTER.deleteAllTopicsAndWait(120000);
     }
 
     private void checkResult(final String outputTopic, final List<String> expectedResult)
throws InterruptedException {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index ce6324d..ab52649 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -274,6 +274,24 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         }
     }
 
+    /**
+     * Deletes all topics and blocks until all topics got deleted.
+     *
+     * @param timeoutMs the max time to wait for the topics to be deleted (does not block
if {@code <= 0})
+     */
+    public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException
{
+        final Set<String> topics = new HashSet<>(JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
+        for (final String topic : topics) {
+            try {
+                brokers[0].deleteTopic(topic);
+            } catch (final UnknownTopicOrPartitionException e) { }
+        }
+
+        if (timeoutMs > 0) {
+            TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics
not deleted after " + timeoutMs + " milli seconds.");
+        }
+    }
+
     public void deleteAndRecreateTopics(final String... topics) throws InterruptedException
{
         deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
         createTopics(topics);
@@ -295,6 +313,10 @@ public class EmbeddedKafkaCluster extends ExternalResource {
             Collections.addAll(deletedTopics, topics);
         }
 
+        public TopicsDeletedCondition(final Set<String> topics) {
+            deletedTopics.addAll(topics);
+        }
+
         @Override
         public boolean conditionMet() {
             final Set<String> allTopics = new HashSet<>(


Mime
View raw message