kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Fix race condition in TestVerifiableProducer sanity test
Date Mon, 22 May 2017 01:23:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d190d89db -> dc10b0ea0


MINOR: Fix race condition in TestVerifiableProducer sanity test

## Fixes race condition in TestVerifiableProducer sanity test:
The test starts a producer, waits for at least 5 acks, and then
logs in to the worker to grep for the producer process to figure
out what version it is running.

The problem was that the producer was set up to produce 1000 messages
at a rate of 1000 msgs/s and then exit. This means it will have a
typical runtime slightly above 1 second.

Logging in to the vagrant instance might take longer than that thus
resulting in the process grep to fail, failing the test.

This commit doesn't really fix the issue - a proper fix would be to tell
the producer to stick around until explicitly killed - but it increases
the chances of the test passing, at the expense of a slightly longer
runtime.

## Improves error reporting when is_version() fails

Author: Magnus Edenhill <magnus@edenhill.se>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2765 from edenhill/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc10b0ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc10b0ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc10b0ea

Branch: refs/heads/trunk
Commit: dc10b0ea014974b450f2abcd4a58e09d5b988e9e
Parents: d190d89
Author: Magnus Edenhill <magnus@edenhill.se>
Authored: Sun May 21 18:23:12 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Sun May 21 18:23:12 2017 -0700

----------------------------------------------------------------------
 tests/kafkatest/sanity_checks/test_kafka_version.py     |  6 +++---
 .../kafkatest/sanity_checks/test_verifiable_producer.py | 12 +++++++++---
 tests/kafkatest/utils/util.py                           |  8 ++++++--
 3 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dc10b0ea/tests/kafkatest/sanity_checks/test_kafka_version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_kafka_version.py b/tests/kafkatest/sanity_checks/test_kafka_version.py
index 7e65807..ca58ca5 100644
--- a/tests/kafkatest/sanity_checks/test_kafka_version.py
+++ b/tests/kafkatest/sanity_checks/test_kafka_version.py
@@ -42,7 +42,7 @@ class KafkaVersionTest(Test):
         node.version = LATEST_0_8_2
         self.kafka.start()
 
-        assert is_version(node, [LATEST_0_8_2])
+        assert is_version(node, [LATEST_0_8_2], logger=self.logger)
 
     @cluster(num_nodes=3)
     def test_multi_version(self):
@@ -54,5 +54,5 @@ class KafkaVersionTest(Test):
         self.kafka.nodes[1].config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
         self.kafka.start()
 
-        assert is_version(self.kafka.nodes[0], [DEV_BRANCH.vstring])
-        assert is_version(self.kafka.nodes[1], [LATEST_0_8_2])
+        assert is_version(self.kafka.nodes[0], [DEV_BRANCH.vstring], logger=self.logger)
+        assert is_version(self.kafka.nodes[1], [LATEST_0_8_2], logger=self.logger)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc10b0ea/tests/kafkatest/sanity_checks/test_verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index be10574..a008532 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -39,7 +39,7 @@ class TestVerifiableProducer(Test):
         self.num_messages = 1000
         # This will produce to source kafka cluster
         self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
-                                           max_messages=self.num_messages, throughput=1000)
+                                           max_messages=self.num_messages, throughput=self.num_messages/5)
 
     def setUp(self):
         self.zk.start()
@@ -66,10 +66,16 @@ class TestVerifiableProducer(Test):
         # that this check works with DEV_BRANCH
         # When running VerifiableProducer 0.8.X, both the current branch version and 0.8.X
should show up because of the
         # way verifiable producer pulls in some development directories into its classpath
+        #
+        # If the test fails here because 'ps .. | grep' couldn't find the process it means
+        # the login and grep that is_version() performs is slower than
+        # the time it takes the producer to produce its messages.
+        # Easy fix is to decrease throughput= above, the good fix is to make the producer
+        # not terminate until explicitly killed in this case.
         if node.version <= LATEST_0_8_2:
-            assert is_version(node, [node.version.vstring, DEV_BRANCH.vstring])
+            assert is_version(node, [node.version.vstring, DEV_BRANCH.vstring], logger=self.logger)
         else:
-            assert is_version(node, [node.version.vstring])
+            assert is_version(node, [node.version.vstring], logger=self.logger)
 
         self.producer.wait()
         num_produced = self.producer.num_acked

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc10b0ea/tests/kafkatest/utils/util.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
index f004ece..3be8d80 100644
--- a/tests/kafkatest/utils/util.py
+++ b/tests/kafkatest/utils/util.py
@@ -31,7 +31,7 @@ def _kafka_jar_versions(proc_string):
     return set(versions)
 
 
-def is_version(node, version_list, proc_grep_string="kafka"):
+def is_version(node, version_list, proc_grep_string="kafka", logger=None):
     """Heuristic to check that only the specified version appears in the classpath of the
process
     A useful tool to aid in checking that service version apis are working correctly.
     """
@@ -39,7 +39,11 @@ def is_version(node, version_list, proc_grep_string="kafka"):
     assert len(lines) == 1
 
     versions = _kafka_jar_versions(lines[0])
-    return versions == {str(v) for v in version_list}
+    r = versions == {str(v) for v in version_list}
+    if not r and logger is not None:
+        logger.warning("%s: %s version mismatch: expected %s: actual %s" % \
+                       (str(node), proc_grep_string, version_list, versions))
+    return r
 
 
 def is_int(msg):


Mime
View raw message