kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: fix EOS test race condition
Date Tue, 10 Oct 2017 17:03:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0 dadae0a35 -> 1db2fc4fd


MINOR: fix EOS test race condition

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #4048 from mjsax/fix-eos-test-race-condition

(cherry picked from commit 64930cd71361853cc46ed02232213cc7ba749e77)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1db2fc4f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1db2fc4f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1db2fc4f

Branch: refs/heads/1.0
Commit: 1db2fc4fd72f3d3b4b737cac5dfd9c48b518747d
Parents: dadae0a
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Oct 10 10:03:02 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Oct 10 10:03:10 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/tests/EosTestClient.java      | 25 +++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1db2fc4f/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 098b77b..5e85bd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import java.io.File;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class EosTestClient extends SmokeTestUtil {
 
@@ -35,6 +36,7 @@ public class EosTestClient extends SmokeTestUtil {
     private final String kafka;
     private final File stateDir;
     private final boolean withRepartitioning;
+    private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
 
     private KafkaStreams streams;
     private boolean uncaughtException;
@@ -46,7 +48,7 @@ public class EosTestClient extends SmokeTestUtil {
         this.withRepartitioning = withRepartitioning;
     }
 
-    private boolean isRunning = true;
+    private volatile boolean isRunning = true;
 
     public void start() {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@@ -54,12 +56,18 @@ public class EosTestClient extends SmokeTestUtil {
             public void run() {
                 isRunning = false;
                 streams.close(TimeUnit.SECONDS.toMillis(300), TimeUnit.SECONDS);
+
+                // need to wait for callback to avoid race condition
+                // -> make sure the callback printout to stdout is there as it is expected
test output
+                waitForStateTransitionCallback();
+
                 // do not remove these printouts since they are needed for health scripts
                 if (!uncaughtException) {
                     System.out.println(System.currentTimeMillis());
                     System.out.println("EOS-TEST-CLIENT-CLOSED");
                     System.out.flush();
                 }
+
             }
         }));
 
@@ -85,6 +93,9 @@ public class EosTestClient extends SmokeTestUtil {
                         System.out.println(System.currentTimeMillis());
                         System.out.println("StateChange: " + oldState + " -> " + newState);
                         System.out.flush();
+                        if (newState == KafkaStreams.State.NOT_RUNNING) {
+                            notRunningCallbackReceived.set(true);
+                        }
                     }
                 });
                 streams.start();
@@ -195,4 +206,16 @@ public class EosTestClient extends SmokeTestUtil {
         return new KafkaStreams(builder.build(), props);
     }
 
+    private void waitForStateTransitionCallback() {
+        final long maxWaitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(300);
+        while (!notRunningCallbackReceived.get() && System.currentTimeMillis() <
maxWaitTime) {
+            try {
+                Thread.sleep(500);
+            } catch (final InterruptedException ignoreAndSwallow) { /* just keep waiting
*/ }
+        }
+        if (!notRunningCallbackReceived.get()) {
+            System.err.println("State transition callback to NOT_RUNNING never received.
Timed out after 5 minutes.");
+            System.err.flush();
+        }
+    }
 }


Mime
View raw message