kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: improve hanging ResetIntegrationTest
Date Wed, 22 Mar 2017 02:20:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 51ffddcbc -> b3f5f9520


MINOR: improve hanging ResetIntegrationTest

Sometimes `ResetIntegrationTest` hangs and thus the build times out. We suspect, that this
happens if no data is written into the input topics. Right now, input data is written once
and reused for both test cases. If for some reason, the broker gets recreated (between both
test cases), no data will be available for the second test method and thus the test hangs.

This change ensures, that input data is written for each test case individually.

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ismael Juma, Eno Thereska, Guozhang Wang

Closes #2630 from mjsax/minor-reset-integration-test

(cherry picked from commit 83824089f2371d6375c2713d494f276aec27c1a1)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3f5f952
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3f5f952
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3f5f952

Branch: refs/heads/0.10.2
Commit: b3f5f95203960ca338024cf0fc8ba437c55b9214
Parents: 51ffddc
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Mar 21 19:18:41 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Mar 21 19:19:56 2017 -0700

----------------------------------------------------------------------
 .../integration/ResetIntegrationTest.java       | 40 ++++++++++++++------
 1 file changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b3f5f952/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index f1a8f68..35a58f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,6 +23,8 @@ import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -44,7 +46,6 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 
@@ -92,14 +93,6 @@ public class ResetIntegrationTest {
     private final MockTime mockTime = CLUSTER.time;
     private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
 
-    @BeforeClass
-    public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(INPUT_TOPIC);
-        CLUSTER.createTopic(OUTPUT_TOPIC);
-        CLUSTER.createTopic(OUTPUT_TOPIC_2);
-        CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
-    }
-
     @AfterClass
     public static void globalCleanup() {
         if (adminClient != null) {
@@ -131,9 +124,7 @@ public class ResetIntegrationTest {
             break;
         }
 
-        if (testNo == 1) {
-            prepareInputData();
-        }
+        prepareInputData();
     }
 
     @Test
@@ -290,6 +281,31 @@ public class ResetIntegrationTest {
     }
 
     private void prepareInputData() throws Exception {
+        try {
+            CLUSTER.deleteTopic(INPUT_TOPIC);
+        } catch (final UnknownTopicOrPartitionException e) {
+            // ignore
+        }
+        try {
+            CLUSTER.deleteTopic(OUTPUT_TOPIC);
+        } catch (final UnknownTopicOrPartitionException e) {
+            // ignore
+        }
+        try {
+            CLUSTER.deleteTopic(OUTPUT_TOPIC_2);
+        } catch (final UnknownTopicOrPartitionException e) {
+            // ignore
+        }
+        try {
+            CLUSTER.deleteTopic(OUTPUT_TOPIC_2_RERUN);
+        } catch (final UnknownTopicOrPartitionException e) {
+            // ignore
+        }
+        CLUSTER.createTopic(INPUT_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC_2);
+        CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
+
         final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
LongSerializer.class, StringSerializer.class);
 
         mockTime.sleep(10);


Mime
View raw message