kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7965; Fix testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup (#6557)
Date Wed, 17 Apr 2019 19:41:57 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a05eaaa  KAFKA-7965; Fix testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
(#6557)
a05eaaa is described below

commit a05eaaa8f41db9fca86020845a3336acc105ee19
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Thu Apr 18 03:41:46 2019 +0800

    KAFKA-7965; Fix testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
(#6557)
    
    Most of the time, the group coordinator runs on broker 1. Occasionally the group coordinator
will be placed on broker 2. If that's the case, the loop starting at line 320 have no chance
to check and update `kickedOutConsumerIdx`. A quick fix is to safely do another round of loop
to ensure `kickedOutConsumerIdx` always be checked after the last broker restart.
    
    Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson
<jason@confluent.io>
---
 .../src/test/scala/integration/kafka/api/ConsumerBounceTest.scala | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index eabd515..2ce5fab 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -316,8 +316,14 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     // roll all brokers with a lesser max group size to make sure coordinator has the new
config
     val newConfigs = generateKafkaConfigs(maxGroupSize.toString)
     var kickedOutConsumerIdx: Option[Int] = None
+    val holdingGroupBrokers = servers.filter(!_.groupCoordinator.groupManager.currentGroups.isEmpty).map(_.config.brokerId)
+    // should only have one broker holding the group metadata
+    assertEquals(holdingGroupBrokers.size, 1)
+    val coordinator = holdingGroupBrokers.head
+    // ensure the coordinator broker will be restarted first
+    val orderedBrokersIds = List(coordinator) ++ servers.indices.toBuffer.filter(_ != coordinator)
     // restart brokers until the group moves to a Coordinator with the new config
-    breakable { for (broker <- servers.indices) {
+    breakable { for (broker <- orderedBrokersIds) {
       killBroker(broker)
       consumerPollers.indices.foreach(idx => {
         consumerPollers(idx).thrownException match {


Mime
View raw message