kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.1 updated: MINOR: Add all topics created check streams broker bounce test (2.1) (#6242)
Date Thu, 21 Feb 2019 20:58:23 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2d94783  MINOR: Add all topics created check streams broker bounce test (2.1) (#6242)
2d94783 is described below

commit 2d94783325f8de3bedc20b84e586f0d18b48fee1
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Thu Feb 21 15:58:13 2019 -0500

    MINOR: Add all topics created check streams broker bounce test (2.1) (#6242)
    
    The StreamsBrokerBounceTest.test_broker_type_bounce experienced what looked like a transient
failure. After looking over this test and failure, it seems like it is vulnerable to timing
error that streams will start before the kafka service creates all topics.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>,
John Roesler <john@confluent.io>
---
 .../tests/streams/streams_broker_bounce_test.py    | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)

diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 8d623eb..7859d69 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -133,6 +133,22 @@ class StreamsBrokerBounceTest(Test):
         for num in range(0, num_failures - 1):
             signal_node(self, self.kafka.nodes[num], sig)
 
+    def confirm_topics_on_all_brokers(self, expected_topic_set):
+        for node in self.kafka.nodes:
+            match_count = 0
+            # need to iterate over topic_list_generator as kafka.list_topics()
+            # returns a python generator so values are fetched lazily
+            # so we can't just compare directly we must iterate over what's returned
+            topic_list_generator = self.kafka.list_topics("placeholder", node)
+            for topic in topic_list_generator:
+                if topic in expected_topic_set:
+                    match_count += 1
+
+            if len(expected_topic_set) != match_count:
+                return False
+
+        return True
+
         
     def setup_system(self, start_processor=True):
         # Setup phase
@@ -141,6 +157,12 @@ class StreamsBrokerBounceTest(Test):
 
         self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=self.zk,
topics=self.topics)
         self.kafka.start()
+
+        # allow some time for topics to be created
+        wait_until(lambda: self.confirm_topics_on_all_brokers(set(self.topics.keys())),
+                   timeout_sec=60,
+                   err_msg="Broker did not create all topics in 60 seconds ")
+
         # Start test harness
         self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
         self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)


Mime
View raw message