kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: improve hanging ResetIntegrationTest
Date Wed, 22 Mar 2017 02:18:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b3beaebd4 -> 83824089f


MINOR: improve hanging ResetIntegrationTest

Sometimes `ResetIntegrationTest` hangs and thus the build times out. We suspect, that this
happens if no data is written into the input topics. Right now, input data is written once
and reused for both test cases. If for some reason, the broker gets recreated (between both
test cases), no data will be available for the second test method and thus the test hangs.

This change ensures, that input data is written for each test case individually.

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ismael Juma, Eno Thereska, Guozhang Wang

Closes #2630 from mjsax/minor-reset-integration-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/83824089
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/83824089
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/83824089

Branch: refs/heads/trunk
Commit: 83824089f2371d6375c2713d494f276aec27c1a1
Parents: b3beaeb
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Mar 21 19:18:41 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Mar 21 19:18:41 2017 -0700

----------------------------------------------------------------------
 .../integration/ResetIntegrationTest.java       | 39 ++++++++++++++------
 1 file changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/83824089/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index ce29f32..695e900 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,6 +23,7 @@ import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -45,7 +46,6 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -94,14 +94,6 @@ public class ResetIntegrationTest {
     private final MockTime mockTime = CLUSTER.time;
     private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
 
-    @BeforeClass
-    public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(INPUT_TOPIC);
-        CLUSTER.createTopic(OUTPUT_TOPIC);
-        CLUSTER.createTopic(OUTPUT_TOPIC_2);
-        CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
-    }
-
     @AfterClass
     public static void globalCleanup() {
         if (adminClient != null) {
@@ -131,9 +123,7 @@ public class ResetIntegrationTest {
             break;
         }
 
-        if (testNo == 1) {
-            prepareInputData();
-        }
+        prepareInputData();
     }
 
     @Test
@@ -290,6 +280,31 @@ public class ResetIntegrationTest {
     }
 
     private void prepareInputData() throws Exception {
+        try {
+            CLUSTER.deleteTopic(INPUT_TOPIC);
+        } catch (final UnknownTopicOrPartitionException e) {
+            // ignore
+        }
+        try {
+            CLUSTER.deleteTopic(OUTPUT_TOPIC);
+        } catch (final UnknownTopicOrPartitionException e) {
+            // ignore
+        }
+        try {
+            CLUSTER.deleteTopic(OUTPUT_TOPIC_2);
+        } catch (final UnknownTopicOrPartitionException e) {
+            // ignore
+        }
+        try {
+            CLUSTER.deleteTopic(OUTPUT_TOPIC_2_RERUN);
+        } catch (final UnknownTopicOrPartitionException e) {
+            // ignore
+        }
+        CLUSTER.createTopic(INPUT_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC_2);
+        CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
+
         final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
LongSerializer.class, StringSerializer.class);
 
         mockTime.sleep(10);


Mime
View raw message