kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8461: Wait for follower to join the ISR in testUncleanLeaderElectionDisabled Test
Date Fri, 07 Jun 2019 06:45:41 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 060701f  KAFKA-8461: Wait for follower to join the ISR in testUncleanLeaderElectionDisabled
Test
060701f is described below

commit 060701fb6f000046e19ccb94a5e366377d0a2599
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
AuthorDate: Fri Jun 7 12:15:03 2019 +0530

    KAFKA-8461: Wait for follower to join the ISR in testUncleanLeaderElectionDisabled Test
    
    Author: Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Reviewers: Jason Gustafson <jason@confluent.io>, Boyang Chen <boyang@confluent.io>
    
    Closes #6887 from omkreddy/unclean-leader
---
 .../kafka/integration/UncleanLeaderElectionTest.scala    | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index c0c8c95..64d7174 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -219,16 +219,16 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     assertEquals(List("first"), consumeAllMessages(topic, 1))
 
     // shutdown follower server
-    servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
+    servers.filter(server => server.config.brokerId == followerId).foreach(server =>
shutdownServer(server))
 
     produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
 
     //remove any previous unclean election metric
-    servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+    servers.foreach(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
 
     // shutdown leader and then restart follower
-    servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
+    servers.filter(server => server.config.brokerId == leaderId).foreach(server =>
shutdownServer(server))
     val followerServer = servers.find(_.config.brokerId == followerId).get
     followerServer.startup()
 
@@ -247,13 +247,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     assertEquals(List.empty[String], consumeAllMessages(topic, 0))
 
     // restart leader temporarily to send a successfully replicated message
-    servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
+    servers.filter(server => server.config.brokerId == leaderId).foreach(server =>
server.startup())
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId))
 
     produceMessage(servers, topic, "third")
-    waitUntilMetadataIsPropagated(servers, topic, partitionId)
-    servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
+    //make sure follower server joins the ISR
+    TestUtils.waitUntilTrue(() => {
+      val partitionInfoOpt = followerServer.metadataCache.getPartitionInfo(topic, partitionId)
+      partitionInfoOpt.isDefined && partitionInfoOpt.get.basePartitionState.isr.contains(followerId)
+    }, "Inconsistent metadata after first server startup")
 
+    servers.filter(server => server.config.brokerId == leaderId).foreach(server =>
shutdownServer(server))
     // verify clean leader transition to ISR follower
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
 


Mime
View raw message