kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: HOTFIX: fix system test race condition (#7836)
Date Tue, 07 Jan 2020 02:34:59 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 69f8fad  HOTFIX: fix system test race condition (#7836)
69f8fad is described below

commit 69f8fad99ce96b361fcd247ade29456b4583eb94
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Tue Dec 31 21:44:31 2019 -0500

    HOTFIX: fix system test race condition (#7836)
    
    In some system tests a Streams app is started and then prints a message to stdout, which
the system test waits for to confirm the node has successfully been brought up. It then greps
for certain log messages in a retriable loop.
    
    But waiting on the Streams app to start/print to stdout does not mean the log file has
been created yet, so the grep may return an error. Although this occurs in a retriable loop
it is assumed that grep will not fail, and the result is piped to wc and then blindly converted
to an int in the python function, which fails since the error message is a string (throws
ValueError)
    
    We should catch the ValueError and return a 0 so it can try again rather than immediately
crash
    
    Reviewers: Bill Bejeck <bbejeck@gmail.com>, John Roesler <vvcephei@users.noreply.github.com>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java | 2 +-
 tests/kafkatest/tests/streams/base_streams_test.py                  | 6 +++++-
 .../kafkatest/tests/streams/streams_broker_down_resilience_test.py  | 2 +-
 3 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index b605f46..ac4e120 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -97,7 +97,7 @@ public class StreamsBrokerDownResilienceTest {
                 public void apply(final String key, final String value) {
                     System.out.println("received key " + key + " and value " + value);
                     messagesProcessed++;
-                    System.out.println("processed" + messagesProcessed + "messages");
+                    System.out.println("processed " + messagesProcessed + " messages");
                     System.out.flush();
                 }
             }).to(SINK_TOPIC);
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py
index 53e4231..256693c 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -98,5 +98,9 @@ class BaseStreamsTest(KafkaTest):
     @staticmethod
     def verify_from_file(processor, message, file):
         result = processor.node.account.ssh_output("grep -E '%s' %s | wc -l" % (message,
file), allow_fail=False)
-        return int(result)
+        try:
+          return int(result)
+        except ValueError:
+          self.logger.warn("Command failed with ValueError: " + result)
+          return 0
 
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index ee5feae..58f3b18 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -28,7 +28,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
     outputTopic = "streamsResilienceSink"
     client_id = "streams-broker-resilience-verify-consumer"
     num_messages = 10000
-    message = "processed[0-9]*messages"
+    message = "processed [0-9]* messages"
     connected_message = "Discovered group coordinator"
 
     def __init__(self, test_context):


Mime
View raw message