kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4980: testReprocessingFromScratch unit test failure
Date Thu, 30 Mar 2017 20:30:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2e075fe6a -> 92b7d7570


KAFKA-4980: testReprocessingFromScratch unit test failure

We got test error `org.apache.kafka.common.errors.TopicExistsException: Topic 'inputTopic'
already exists.` in some builds. Can reproduce reliably at local machine. Root cause it async
"topic delete" that might not be finished before topic gets re-created.

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ismael Juma, Damian Guy, Guozhang Wang

Closes #2757 from mjsax/minor-fix-resetintegrationtest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/92b7d757
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/92b7d757
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/92b7d757

Branch: refs/heads/trunk
Commit: 92b7d7570027cdc4e069862472ce6ec6b3dad04a
Parents: 2e075fe
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Mar 30 13:30:10 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Mar 30 13:30:10 2017 -0700

----------------------------------------------------------------------
 .../integration/ResetIntegrationTest.java       | 33 +++++++++++++++++++-
 1 file changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/92b7d757/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 695e900..cf35902 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -117,7 +117,7 @@ public class ResetIntegrationTest {
             try {
                 TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
                         "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER
* CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-            } catch (TimeoutException e) {
+            } catch (final TimeoutException e) {
                 continue;
             }
             break;
@@ -300,6 +300,9 @@ public class ResetIntegrationTest {
         } catch (final UnknownTopicOrPartitionException e) {
             // ignore
         }
+
+        waitUntilUserTopicsAreDeleted();
+
         CLUSTER.createTopic(INPUT_TOPIC);
         CLUSTER.createTopic(OUTPUT_TOPIC);
         CLUSTER.createTopic(OUTPUT_TOPIC_2);
@@ -402,6 +405,34 @@ public class ResetIntegrationTest {
         Assert.assertEquals(0, exitCode);
     }
 
+    private void waitUntilUserTopicsAreDeleted() {
+        ZkUtils zkUtils = null;
+        try {
+            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+                30000,
+                30000,
+                JaasUtils.isZkSecurityEnabled());
+
+            while (userTopicExists(new HashSet<>(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()))))
{
+                Utils.sleep(100);
+            }
+        } finally {
+            if (zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+    }
+
+    private boolean userTopicExists(final Set<String> allTopics) {
+        final Set<String> expectedMissingTopics = new HashSet<>();
+        expectedMissingTopics.add(INPUT_TOPIC);
+        expectedMissingTopics.add(OUTPUT_TOPIC);
+        expectedMissingTopics.add(OUTPUT_TOPIC_2);
+        expectedMissingTopics.add(OUTPUT_TOPIC_2_RERUN);
+
+        return expectedMissingTopics.removeAll(allTopics);
+    }
+
     private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
         final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
         expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);


Mime
View raw message