kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.2 updated: MINOR: followup to Version Probing improvements2.2 (#7448)
Date Fri, 04 Oct 2019 22:58:56 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 79d0f55  MINOR: followup to Version Probing improvements2.2  (#7448)
79d0f55 is described below

commit 79d0f55ba7fe4e9d0a07026cb819b421fa0f7c00
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Fri Oct 4 15:58:38 2019 -0700

    MINOR: followup to Version Probing improvements2.2  (#7448)
    
    Small follow-up to trunk PR #7426
    
    While debugging the 2.3 VP PR we realized we should remove the leader-tracking from the
VP system test altogether. We'd already merged the corresponding trunk PR so I made a quick
new PR for trunk.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../tests/streams/streams_upgrade_test.py          | 44 ++--------------------
 1 file changed, 3 insertions(+), 41 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 7b9a310..c315f72 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -84,8 +84,6 @@ class StreamsUpgradeTest(Test):
             'echo' : { 'partitions': 5 },
             'data' : { 'partitions': 5 },
         }
-        self.leader = None
-        self.leader_counter = {}
 
     processed_msg = "processed [0-9]* records"
 
@@ -311,13 +309,6 @@ class StreamsUpgradeTest(Test):
         self.processors = [self.processor1, self.processor2, self.processor3]
         self.old_processors = [self.processor1, self.processor2, self.processor3]
         self.upgraded_processors = []
-        for p in self.processors:
-            self.leader_counter[p] = 2
-
-        self.update_leader()
-        for p in self.processors:
-            self.leader_counter[p] = 0
-        self.leader_counter[self.leader] = 3
 
         counter = 1
         current_generation = 3
@@ -342,25 +333,6 @@ class StreamsUpgradeTest(Test):
                                    timeout_sec=60,
                                    err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED'
on" + str(node.account))
 
-    def update_leader(self):
-        self.leader = None
-        retries = 10
-        while retries > 0:
-            for p in self.processors:
-                found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\"
%s" % p.LOG_FILE, allow_fail=True))
-                if len(found) >= self.leader_counter[p] + 1:
-                    self.leader = p
-                    self.leader_counter[p] = self.leader_counter[p] + 1
-
-            if self.leader is None:
-                retries = retries - 1
-                time.sleep(5)
-            else:
-                break
-
-        if self.leader is None:
-            raise Exception("Could not identify leader")
-
     def get_version_string(self, version):
         if version.startswith("0") or version.startswith("1") \
           or version.startswith("2.0") or version.startswith("2.1"):
@@ -524,7 +496,6 @@ class StreamsUpgradeTest(Test):
                 node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE
+ "." + str(counter), allow_fail=False)
                 node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE
+ "." + str(counter), allow_fail=False)
                 node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE +
"." + str(counter), allow_fail=False)
-                self.leader_counter[processor] = 0
 
                 with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
                     processor.set_upgrade_to("future_version")
@@ -540,11 +511,6 @@ class StreamsUpgradeTest(Test):
                                            timeout_sec=60,
                                            err_msg="Could not detect FutureStreamsPartitionAssignor
in " + str(node.account))
 
-                    if processor == self.leader:
-                        self.update_leader()
-                    else:
-                        self.leader_counter[self.leader] = self.leader_counter[self.leader]
+ 1
-
                     monitors = {}
                     monitors[processor] = log_monitor
                     monitors[first_other_processor] = first_other_monitor
@@ -598,12 +564,8 @@ class StreamsUpgradeTest(Test):
                     if generation_synchronized == False:
                         raise Exception("Never saw all three processors have the synchronized
generation number")
 
-                    if processor == self.leader:
-                        self.update_leader()
-                    else:
-                        self.leader_counter[self.leader] = self.leader_counter[self.leader]
+ 1
 
-                    if self.leader in self.old_processors or len(self.old_processors) >
0:
+                    if len(self.old_processors) > 0:
                         self.verify_metadata_no_upgraded_yet()
 
         return current_generation
@@ -616,9 +578,9 @@ class StreamsUpgradeTest(Test):
 
     def verify_metadata_no_upgraded_yet(self):
         for p in self.processors:
-            found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription
and group leader.s latest supported version is 5. Upgrading subscription metadata version
to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
+            found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription
and group.s latest commonly supported version is 5 (successful version probing and end of
rolling upgrade). Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE,
allow_fail=True))
             if len(found) > 0:
-                raise Exception("Kafka Streams failed with 'group member upgraded to metadata
4 too early'")
+                raise Exception("Kafka Streams failed with 'group member upgraded to metadata
5 too early'")
 
     def get_topics_count(self):
         count = 0


Mime
View raw message