kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9974: Fix flaky test by removing unneeded asserts (#8646)
Date Tue, 16 Jun 2020 17:24:43 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 4d7a54c  KAFKA-9974: Fix flaky test by removing unneeded asserts (#8646)
4d7a54c is described below

commit 4d7a54cfb143e55e3137aa59ad3f6ed18ebc3928
Author: showuon <43372967+showuon@users.noreply.github.com>
AuthorDate: Wed Jun 17 01:23:09 2020 +0800

    KAFKA-9974: Fix flaky test by removing unneeded asserts (#8646)
    
    The tests failed at assertThat(listener.startOffset, is(equalTo(0L)));. It looks like
that it did a restore before the assert. But we should expect the restore sometimes happen
to resume the failed tasks by itself. It should not cause the test failure under this situation.
    
    On the other hands, the original tests added the assertThat(listener.startOffset, is(equalTo(0L)));
is because in the end of the test, we'll also test the startOffset value. But in the newer
version of the test, we don't really care about the startOffset or totalNumRestored value.
All we want to test in this test is:
    Assert that the current value in store reflects all messages being processed
    
    So, removing the assert can avoid flaky test failure, and also be able to test what the
test case want to test.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/integration/OptimizedKTableIntegrationTest.java      | 9 ---------
 1 file changed, 9 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index ea24090..fbf40e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertTrue;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -46,7 +45,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.KeyQueryMetadata;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.TrackingStateRestoreListener;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -107,9 +105,7 @@ public class OptimizedKTableIntegrationTest {
         final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
         final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration());
         final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
-        final TrackingStateRestoreListener listener = new TrackingStateRestoreListener();
 
-        kafkaStreamsList.forEach(kafkaStreams -> kafkaStreams.setGlobalStateRestoreListener(listener));
         startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
 
         produceValueRange(key, 0, batch1NumMessages);
@@ -131,11 +127,6 @@ public class OptimizedKTableIntegrationTest {
             kafkaStreams1WasFirstActive = false;
         }
 
-        // Assert that no restore has occurred, ensures that when we check later that the
restore
-        // notification actually came from after the rebalance.
-        assertTrue(listener.allStartOffsetsAtZero());
-        assertThat(listener.totalNumRestored(), is(equalTo(0L)));
-
         // Assert that the current value in store reflects all messages being processed
         assertThat(kafkaStreams1WasFirstActive ? store1.get(key) : store2.get(key), is(equalTo(batch1NumMessages
- 1)));
 


Mime
View raw message