kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest (#8963)
Date Mon, 06 Jul 2020 21:07:20 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 72dd93a  KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest (#8963)
72dd93a is described below

commit 72dd93a3e832de7a29ff6360b8f316626d3e8013
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Mon Jul 6 14:04:36 2020 -0700

    KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest (#8963)
    
    The current failures we're seeing with this test are due to faulty assumptions that it
makes and not any real bug in eos-beta (at least, from what I've seen so far).
    
    The test relies on tightly controlling the commits, which it does by setting the commit
interval to MAX_VALUE and manually requesting commits on the context. In two phases, the test
assumes that any pending data will be committed after a rebalance. But we actually take care
to avoid unnecessary commits -- with eos-alpha, we only commit tasks that are revoked while
in eos-beta we must commit all tasks if any are revoked, but only if the revoked tasks themselves
need a commit.
    
    The failure we see occurs when we try to verify the committed data after a second client
is started and the group rebalances. The already-running client has to give up two tasks to
the newly started client, but those tasks may not need to be committed in which case none
of the tasks would be. So we still have an open transaction on the partitions where we try
to read committed data.
    
    Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../integration/EosBetaUpgradeIntegrationTest.java | 49 ++++++++++++++++++++--
 .../integration/utils/IntegrationTestUtils.java    |  2 +-
 2 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
index 7e490b0..8444e73 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Partitioner;
@@ -42,7 +43,10 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAss
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -80,6 +84,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -148,6 +153,27 @@ public class EosBetaUpgradeIntegrationTest {
     private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
     private final AtomicInteger commitRequested = new AtomicInteger(0);
 
+    // Note: this pattern only works when we just have a single instance running with a single
thread
+    // If we want to extend the test or reuse this CommitPunctuator we should tighten it
up
+    private final AtomicBoolean requestCommit = new AtomicBoolean(false);
+    private static class CommitPunctuator implements Punctuator {
+        final ProcessorContext context;
+        final AtomicBoolean requestCommit;
+
+        public CommitPunctuator(final ProcessorContext context, final AtomicBoolean requestCommit)
{
+            this.context = context;
+            this.requestCommit = requestCommit;
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+            if (requestCommit.get()) {
+                context.commit();
+                requestCommit.set(false);
+            }
+        }
+    }
+
     private Throwable uncaughtException;
 
     private int testNumber = 0;
@@ -403,6 +429,8 @@ public class EosBetaUpgradeIntegrationTest {
             //   p-1: 10 rec + C + 5 rec + A + 5 rec ---> C
             //   p-2: 10 rec + C + 5 rec ---> C
             //   p-3: 10 rec + C + 5 rec ---> C
+            requestCommit.set(true);
+            waitForCondition(() -> !requestCommit.get(), "Punctuator did not request commit
for running client");
             commitRequested.set(0);
             stateTransitions1.clear();
             stateTransitions2.clear();
@@ -733,6 +761,8 @@ public class EosBetaUpgradeIntegrationTest {
             //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec
+ C + 5 rec ---> C
             //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec
+ A + 5 rec ---> C
             //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec
+ A + 5 rec ---> C
+            requestCommit.set(true);
+            waitForCondition(() -> !requestCommit.get(), "Punctuator did not request commit
for running client");
             commitRequested.set(0);
             stateTransitions1.clear();
             stateTransitions2.clear();
@@ -826,6 +856,7 @@ public class EosBetaUpgradeIntegrationTest {
                     KeyValueStore<Long, Long> state = null;
                     AtomicBoolean crash;
                     AtomicInteger sharedCommit;
+                    Cancellable punctuator;
 
                     @Override
                     public void init(final ProcessorContext context) {
@@ -839,6 +870,11 @@ public class EosBetaUpgradeIntegrationTest {
                             crash = errorInjectedClient2;
                             sharedCommit = commitCounterClient2;
                         }
+                        punctuator = context.schedule(
+                            Duration.ofMillis(100),
+                            PunctuationType.WALL_CLOCK_TIME,
+                            new CommitPunctuator(context, requestCommit)
+                        );
                     }
 
                     @Override
@@ -871,7 +907,9 @@ public class EosBetaUpgradeIntegrationTest {
                     }
 
                     @Override
-                    public void close() { }
+                    public void close() {
+                        punctuator.cancel();
+                    }
                 };
             } }, storeNames)
             .to(MULTI_PARTITION_OUTPUT_TOPIC);
@@ -1038,9 +1076,12 @@ public class EosBetaUpgradeIntegrationTest {
         return expectedResult;
     }
 
-    private Set<Long> keysFromInstance(final KafkaStreams streams) {
-        final ReadOnlyKeyValueStore<Long, Long> store =
-            streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
+    private Set<Long> keysFromInstance(final KafkaStreams streams) throws Exception
{
+        final ReadOnlyKeyValueStore<Long, Long> store = getStore(
+            MAX_WAIT_TIME_MS,
+            streams,
+            StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())
+        );
         final Set<Long> keys = new HashSet<>();
         try (final KeyValueIterator<Long, Long> it = store.all()) {
             while (it.hasNext()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index c4cb121..0f3565d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1271,7 +1271,7 @@ public class IntegrationTestUtils {
          */
         public void waitForNextStableAssignment(final long maxWaitMs) throws InterruptedException
{
             waitForCondition(
-                () -> nextExpectedNumStableAssignments == numStableAssignments(),
+                () -> numStableAssignments() >= nextExpectedNumStableAssignments,
                 maxWaitMs,
                 () -> "Client did not reach " + nextExpectedNumStableAssignments + " stable
assignments on time, " +
                     "numStableAssignments was " + numStableAssignments()


Mime
View raw message