kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: fixes on streams upgrade test (#5754)
Date Sun, 14 Oct 2018 05:39:33 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2d77746  MINOR: fixes on streams upgrade test (#5754)
2d77746 is described below

commit 2d77746a7beeb96d04ff3a907cde78f126cf6e85
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Sat Oct 13 22:39:24 2018 -0700

    MINOR: fixes on streams upgrade test (#5754)
    
    1. In test_upgrade_downgrade_brokers, allow duplicates to happen.
    2. In test_version_probing_upgrade, grep the generation numbers from brokers at the end,
and check if they can ever be synchronized.
    
    Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Bill Bejeck <bill@confluent.io>
---
 .../tests/streams/streams_upgrade_test.py          | 52 ++++++++++++++--------
 1 file changed, 34 insertions(+), 18 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 39e21bf..4314a35 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -122,7 +122,7 @@ class StreamsUpgradeTest(Test):
         self.processor1.stop()
 
         node = self.driver.node
-        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
+        node.account.ssh("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' %s"
% self.driver.STDOUT_FILE, allow_fail=False)
         self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE,
allow_fail=False)
 
     @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
@@ -470,8 +470,6 @@ class StreamsUpgradeTest(Test):
                     self.old_processors.remove(processor)
                     self.upgraded_processors.append(processor)
 
-                    current_generation = current_generation + 1
-
                     log_monitor.wait_until("Kafka version : " + str(DEV_VERSION),
                                            timeout_sec=60,
                                            err_msg="Could not detect Kafka Streams version
" + str(DEV_VERSION) + " in " + str(node.account))
@@ -480,16 +478,6 @@ class StreamsUpgradeTest(Test):
                                            timeout_sec=60,
                                            err_msg="Could not detect FutureStreamsPartitionAssignor
in " + str(node.account))
 
-                    log_monitor.wait_until("Successfully joined group with generation " +
str(current_generation),
-                                           timeout_sec=60,
-                                           err_msg="Never saw output 'Successfully joined
group with generation " + str(current_generation) + "' on" + str(node.account))
-                    first_other_monitor.wait_until("Successfully joined group with generation
" + str(current_generation),
-                                                   timeout_sec=60,
-                                                   err_msg="Never saw output 'Successfully
joined group with generation " + str(current_generation) + "' on" + str(first_other_node.account))
-                    second_other_monitor.wait_until("Successfully joined group with generation
" + str(current_generation),
-                                                    timeout_sec=60,
-                                                    err_msg="Never saw output 'Successfully
joined group with generation " + str(current_generation) + "' on" + str(second_other_node.account))
-
                     if processor == self.leader:
                         self.update_leader()
                     else:
@@ -533,12 +521,34 @@ class StreamsUpgradeTest(Test):
                                            err_msg="Could not detect 'Triggering new rebalance'
at upgrading node " + str(node.account))
 
                     # version probing should trigger second rebalance
-                    current_generation = current_generation + 1
+                    # now we check that after consecutive rebalances we have synchronized
generation
+                    generation_synchronized = False
+                    retries = 0
 
-                    for p in self.processors:
-                        monitors[p].wait_until("Successfully joined group with generation
" + str(current_generation),
-                                               timeout_sec=60,
-                                               err_msg="Never saw output 'Successfully joined
group with generation " + str(current_generation) + "' on" + str(p.node.account))
+                    while retries < 10:
+                        processor_found = self.extract_generation_from_logs(processor)
+                        first_other_processor_found = self.extract_generation_from_logs(first_other_processor)
+                        second_other_processor_found = self.extract_generation_from_logs(second_other_processor)
+
+                        if len(processor_found) > 0 and len(first_other_processor_found)
> 0 and len(second_other_processor_found) > 0:
+                            self.logger.info("processor: " + str(processor_found))
+                            self.logger.info("first other processor: " + str(first_other_processor_found))
+                            self.logger.info("second other processor: " + str(second_other_processor_found))
+
+                            processor_generation = self.extract_highest_generation(processor_found)
+                            first_other_processor_generation = self.extract_highest_generation(first_other_processor_found)
+                            second_other_processor_generation = self.extract_highest_generation(second_other_processor_found)
+
+                            if processor_generation == first_other_processor_generation and
processor_generation == second_other_processor_generation:
+                                current_generation = processor_generation
+                                generation_synchronized = True
+                                break
+
+                        time.sleep(5)
+                        retries = retries + 1
+
+                    if generation_synchronized == False:
+                        raise Exception("Never saw all three processors have the synchronized
generation number")
 
                     if processor == self.leader:
                         self.update_leader()
@@ -550,6 +560,12 @@ class StreamsUpgradeTest(Test):
 
         return current_generation
 
+    def extract_generation_from_logs(self, processor):
+        return list(processor.node.account.ssh_capture("grep \"Successfully joined group
with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == \"generation\") beginning=i+1;
if($i== \"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i };
for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % processor.LOG_FILE, allow_fail=True))
+
+    def extract_highest_generation(self, found_generations):
+        return int(found_generations[-1])
+
     def verify_metadata_no_upgraded_yet(self):
         for p in self.processors:
             found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription
and group leader.s latest supported version is 5. Upgrading subscription metadata version
to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))


Mime
View raw message