kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 0.10.2 updated: MINOR: Fix race condition in TestVerifiableProducer sanity test
Date Tue, 03 Jul 2018 20:23:15 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.2 by this push:
     new 5ef6e97  MINOR: Fix race condition in TestVerifiableProducer sanity test
5ef6e97 is described below

commit 5ef6e97a4eb6923f784669d843edf0da26bd4a59
Author: Magnus Edenhill <magnus@edenhill.se>
AuthorDate: Sun May 21 18:23:12 2017 -0700

    MINOR: Fix race condition in TestVerifiableProducer sanity test
    
    ## Fixes race condition in TestVerifiableProducer sanity test:
    The test starts a producer, waits for at least 5 acks, and then
    logs in to the worker to grep for the producer process to figure
    out what version it is running.
    
    The problem was that the producer was set up to produce 1000 messages
    at a rate of 1000 msgs/s and then exit. This means it will have a
    typical runtime slightly above 1 second.
    
    Logging in to the vagrant instance might take longer than that thus
    resulting in the process grep to fail, failing the test.
    
    This commit doesn't really fix the issue - a proper fix would be to tell
    the producer to stick around until explicitly killed - but it increases
    the chances of the test passing, at the expense of a slightly longer
    runtime.
    
    ## Improves error reporting when is_version() fails
    
    Author: Magnus Edenhill <magnus@edenhill.se>
    
    Reviewers: Apurva Mehta <apurva@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #2765 from edenhill/trunk
---
 tests/kafkatest/sanity_checks/test_kafka_version.py       |  6 +++---
 tests/kafkatest/sanity_checks/test_verifiable_producer.py | 12 +++++++++---
 tests/kafkatest/utils/util.py                             |  8 ++++++--
 3 files changed, 18 insertions(+), 8 deletions(-)

diff --git a/tests/kafkatest/sanity_checks/test_kafka_version.py b/tests/kafkatest/sanity_checks/test_kafka_version.py
index 7e65807..ca58ca5 100644
--- a/tests/kafkatest/sanity_checks/test_kafka_version.py
+++ b/tests/kafkatest/sanity_checks/test_kafka_version.py
@@ -42,7 +42,7 @@ class KafkaVersionTest(Test):
         node.version = LATEST_0_8_2
         self.kafka.start()
 
-        assert is_version(node, [LATEST_0_8_2])
+        assert is_version(node, [LATEST_0_8_2], logger=self.logger)
 
     @cluster(num_nodes=3)
     def test_multi_version(self):
@@ -54,5 +54,5 @@ class KafkaVersionTest(Test):
         self.kafka.nodes[1].config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
         self.kafka.start()
 
-        assert is_version(self.kafka.nodes[0], [DEV_BRANCH.vstring])
-        assert is_version(self.kafka.nodes[1], [LATEST_0_8_2])
+        assert is_version(self.kafka.nodes[0], [DEV_BRANCH.vstring], logger=self.logger)
+        assert is_version(self.kafka.nodes[1], [LATEST_0_8_2], logger=self.logger)
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index be10574..a008532 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -39,7 +39,7 @@ class TestVerifiableProducer(Test):
         self.num_messages = 1000
         # This will produce to source kafka cluster
         self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
-                                           max_messages=self.num_messages, throughput=1000)
+                                           max_messages=self.num_messages, throughput=self.num_messages/5)
 
     def setUp(self):
         self.zk.start()
@@ -66,10 +66,16 @@ class TestVerifiableProducer(Test):
         # that this check works with DEV_BRANCH
         # When running VerifiableProducer 0.8.X, both the current branch version and 0.8.X
should show up because of the
         # way verifiable producer pulls in some development directories into its classpath
+        #
+        # If the test fails here because 'ps .. | grep' couldn't find the process it means
+        # the login and grep that is_version() performs is slower than
+        # the time it takes the producer to produce its messages.
+        # Easy fix is to decrease throughput= above, the good fix is to make the producer
+        # not terminate until explicitly killed in this case.
         if node.version <= LATEST_0_8_2:
-            assert is_version(node, [node.version.vstring, DEV_BRANCH.vstring])
+            assert is_version(node, [node.version.vstring, DEV_BRANCH.vstring], logger=self.logger)
         else:
-            assert is_version(node, [node.version.vstring])
+            assert is_version(node, [node.version.vstring], logger=self.logger)
 
         self.producer.wait()
         num_produced = self.producer.num_acked
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
index f004ece..3be8d80 100644
--- a/tests/kafkatest/utils/util.py
+++ b/tests/kafkatest/utils/util.py
@@ -31,7 +31,7 @@ def _kafka_jar_versions(proc_string):
     return set(versions)
 
 
-def is_version(node, version_list, proc_grep_string="kafka"):
+def is_version(node, version_list, proc_grep_string="kafka", logger=None):
     """Heuristic to check that only the specified version appears in the classpath of the
process
     A useful tool to aid in checking that service version apis are working correctly.
     """
@@ -39,7 +39,11 @@ def is_version(node, version_list, proc_grep_string="kafka"):
     assert len(lines) == 1
 
     versions = _kafka_jar_versions(lines[0])
-    return versions == {str(v) for v in version_list}
+    r = versions == {str(v) for v in version_list}
+    if not r and logger is not None:
+        logger.warning("%s: %s version mismatch: expected %s: actual %s" % \
+                       (str(node), proc_grep_string, version_list, versions))
+    return r
 
 
 def is_int(msg):


Mime
View raw message