kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.3 updated: HOTFIX: fix system test race condition (#7836)
Date Tue, 07 Jan 2020 02:35:27 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 8cebea7  HOTFIX: fix system test race condition (#7836)
8cebea7 is described below

commit 8cebea76ff739e979adaf41bd11ab580d452c0b6
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Tue Dec 31 21:44:31 2019 -0500

    HOTFIX: fix system test race condition (#7836)
    
    In some system tests a Streams app is started and then prints a message to stdout, which
the system test waits for to confirm the node has successfully been brought up. It then greps
for certain log messages in a retriable loop.
    
    But waiting on the Streams app to start/print to stdout does not mean the log file has
been created yet, so the grep may return an error. Although this occurs in a retriable loop
it is assumed that grep will not fail, and the result is piped to wc and then blindly converted
to an int in the python function, which fails since the error message is a string (throws
ValueError)
    
    We should catch the ValueError and return a 0 so it can try again rather than immediately
crash
    
    Reviewers: Bill Bejeck <bbejeck@gmail.com>, John Roesler <vvcephei@users.noreply.github.com>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java | 2 +-
 tests/kafkatest/tests/streams/base_streams_test.py                  | 6 +++++-
 .../kafkatest/tests/streams/streams_broker_down_resilience_test.py  | 2 +-
 3 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index 25c642e..38480c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -97,7 +97,7 @@ public class StreamsBrokerDownResilienceTest {
                 public void apply(final String key, final String value) {
                     System.out.println("received key " + key + " and value " + value);
                     messagesProcessed++;
-                    System.out.println("processed" + messagesProcessed + "messages");
+                    System.out.println("processed " + messagesProcessed + " messages");
                     System.out.flush();
                 }
             }).to(SINK_TOPIC);
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py
index 53e4231..256693c 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -98,5 +98,9 @@ class BaseStreamsTest(KafkaTest):
     @staticmethod
     def verify_from_file(processor, message, file):
         result = processor.node.account.ssh_output("grep -E '%s' %s | wc -l" % (message,
file), allow_fail=False)
-        return int(result)
+        try:
+          return int(result)
+        except ValueError:
+          self.logger.warn("Command failed with ValueError: " + result)
+          return 0
 
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index ee5feae..58f3b18 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -28,7 +28,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
     outputTopic = "streamsResilienceSink"
     client_id = "streams-broker-resilience-verify-consumer"
     num_messages = 10000
-    message = "processed[0-9]*messages"
+    message = "processed [0-9]* messages"
     connected_message = "Discovered group coordinator"
 
     def __init__(self, test_context):


Mime
View raw message