kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4980: testReprocessingFromScratch unit test failure
Date Thu, 20 Apr 2017 23:05:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 0731d35e5 -> 495cdbde9


KAFKA-4980: testReprocessingFromScratch unit test failure

We got test error `org.apache.kafka.common.errors.TopicExistsException: Topic 'inputTopic'
already exists.` in some builds. Can reproduce reliably at local machine. Root cause it async
"topic delete" that might not be finished before topic gets re-created.

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ismael Juma, Damian Guy, Guozhang Wang

Closes #2757 from mjsax/minor-fix-resetintegrationtest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/495cdbde
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/495cdbde
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/495cdbde

Branch: refs/heads/0.10.2
Commit: 495cdbde9e164513c8f982df80c0f29a871b62f2
Parents: 0731d35
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Mar 30 13:30:10 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Apr 20 16:05:31 2017 -0700

----------------------------------------------------------------------
 .../integration/ResetIntegrationTest.java       | 38 ++++++++++++++++++--
 1 file changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/495cdbde/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 27b9ea6..d653d00 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,6 +23,7 @@ import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
@@ -115,9 +116,11 @@ public class ResetIntegrationTest {
             try {
                 TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
                         "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER
* CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-            } catch (GroupCoordinatorNotAvailableException e) {
+            } catch (final GroupCoordinatorNotAvailableException e) {
                 continue;
-            } catch (IllegalArgumentException e) {
+            } catch (final IllegalArgumentException e) {
+                continue;
+            } catch (final TimeoutException e) {
                 continue;
             }
             break;
@@ -301,6 +304,9 @@ public class ResetIntegrationTest {
         } catch (final UnknownTopicOrPartitionException e) {
             // ignore
         }
+
+        waitUntilUserTopicsAreDeleted();
+
         CLUSTER.createTopic(INPUT_TOPIC);
         CLUSTER.createTopic(OUTPUT_TOPIC);
         CLUSTER.createTopic(OUTPUT_TOPIC_2);
@@ -403,6 +409,34 @@ public class ResetIntegrationTest {
         Assert.assertEquals(0, exitCode);
     }
 
+    private void waitUntilUserTopicsAreDeleted() {
+        ZkUtils zkUtils = null;
+        try {
+            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+                30000,
+                30000,
+                JaasUtils.isZkSecurityEnabled());
+
+            while (userTopicExists(new HashSet<>(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()))))
{
+                Utils.sleep(100);
+            }
+        } finally {
+            if (zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+    }
+
+    private boolean userTopicExists(final Set<String> allTopics) {
+        final Set<String> expectedMissingTopics = new HashSet<>();
+        expectedMissingTopics.add(INPUT_TOPIC);
+        expectedMissingTopics.add(OUTPUT_TOPIC);
+        expectedMissingTopics.add(OUTPUT_TOPIC_2);
+        expectedMissingTopics.add(OUTPUT_TOPIC_2_RERUN);
+
+        return expectedMissingTopics.removeAll(allTopics);
+    }
+
     private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
         final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
         expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);


Mime
View raw message