kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING upon start (#7519)
Date Tue, 15 Oct 2019 18:35:38 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new bb75897  KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING
upon start (#7519)
bb75897 is described below

commit bb758977e91c10b0e56da1c51dcaee4d6a094d73
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Tue Oct 15 11:34:48 2019 -0700

    KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING upon start
(#7519)
    
    I looked into the logs of the above tickets, and I think for a couple fo them it is due
to the fact that the threads takes time to restore, or just stabilize the rebalance since
there are multi-threads. Adding the hook to wait for state to transit to RUNNING upon starting.
    
    Reviewers: Chris Pettitt <cpettitt@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../integration/QueryableStateIntegrationTest.java | 33 ++++++++++++++++------
 1 file changed, 24 insertions(+), 9 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index c5dbabe..de2ae27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -54,6 +54,7 @@ import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -92,6 +93,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
 
 @Category({IntegrationTest.class})
 public class QueryableStateIntegrationTest {
@@ -261,6 +263,16 @@ public class QueryableStateIntegrationTest {
         @Override
         public void run() {
             myStream.start();
+
+            try {
+                TestUtils.waitForCondition(
+                    () -> stateListener.mapStates.containsKey(KafkaStreams.State.RUNNING),
+                    "Did not start successfully after " + TestUtils.DEFAULT_MAX_WAIT_MS +
" ms"
+                );
+            } catch (final InterruptedException e) {
+                if (!stateListener.mapStates.containsKey(KafkaStreams.State.RUNNING))
+                    fail("Did not start successfully");
+            }
         }
 
         public void close() {
@@ -446,7 +458,8 @@ public class QueryableStateIntegrationTest {
             windowStoreName,
             streamsConfiguration);
 
-        kafkaStreams.start();
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
         producerThread.start();
 
         try {
@@ -515,7 +528,7 @@ public class QueryableStateIntegrationTest {
         t2.toStream().to(outputTopic);
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.start();
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
@@ -581,7 +594,7 @@ public class QueryableStateIntegrationTest {
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.start();
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
 
@@ -629,7 +642,7 @@ public class QueryableStateIntegrationTest {
         t3.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.start();
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
@@ -690,7 +703,7 @@ public class QueryableStateIntegrationTest {
             .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
             .count(Materialized.as(windowStoreName));
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.start();
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
@@ -716,7 +729,7 @@ public class QueryableStateIntegrationTest {
         final String storeName = "count-by-key";
         stream.groupByKey().count(Materialized.as(storeName));
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.start();
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
         final KeyValue<String, String> hello = KeyValue.pair("hello", "hello");
         IntegrationTestUtils.produceKeyValuesSynchronously(
@@ -747,9 +760,9 @@ public class QueryableStateIntegrationTest {
         // close stream
         kafkaStreams.close();
 
-        // start again
+        // start again, and since it may take time to restore we wait for it to transit to
RUNNING a bit longer
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.start();
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams, maxWaitMs);
 
         // make sure we never get any value other than 8 for hello
         TestUtils.waitForCondition(
@@ -810,7 +823,9 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
-        kafkaStreams.start();
+
+        // since we start with two threads, wait for a bit longer for both of them to transit
to running
+        startKafkaStreamsAndWaitForRunningState(kafkaStreams, 30000);
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,


Mime
View raw message