kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Refactor admin client helpers for checking leader and ISR (#7074)
Date Sat, 13 Jul 2019 07:50:51 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1f2d230  MINOR: Refactor admin client helpers for checking leader and ISR (#7074)
1f2d230 is described below

commit 1f2d230bfdaafb34c9be12a370ab2eb4d3016039
Author: José Armando García Sancio <jsancio@users.noreply.github.com>
AuthorDate: Sat Jul 13 00:50:27 2019 -0700

    MINOR: Refactor admin client helpers for checking leader and ISR (#7074)
    
    Reviewers: Vikas Singh <soondenana@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/api/AdminClientIntegrationTest.scala     | 135 ++++++++-------------
 .../kafka/admin/LeaderElectionCommandTest.scala    |  74 +++--------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  58 ++++++++-
 3 files changed, 124 insertions(+), 143 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index c0a38f5..e61f03f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1280,8 +1280,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     /** Changes the <i>preferred</i> leader without changing the <i>current</i>
leader. */
     def changePreferredLeader(newAssignment: Seq[Int]) = {
       val preferred = newAssignment.head
-      val prior1 = currentLeader(client, partition1).get
-      val prior2 = currentLeader(client, partition2).get
+      val prior1 = TestUtils.currentLeader(client, partition1).get
+      val prior2 = TestUtils.currentLeader(client, partition2).get
 
       var m = Map.empty[TopicPartition, Seq[Int]]
 
@@ -1296,26 +1296,26 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
         s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)}
and ${preferredLeader(partition2)}",
         10000)
       // Check the leader hasn't moved
-      assertEquals(Some(prior1), currentLeader(client, partition1))
-      assertEquals(Some(prior2), currentLeader(client, partition2))
+      assertEquals(Some(prior1), TestUtils.currentLeader(client, partition1))
+      assertEquals(Some(prior2), TestUtils.currentLeader(client, partition2))
     }
 
     // Check current leaders are 0
-    assertEquals(Some(0), currentLeader(client, partition1))
-    assertEquals(Some(0), currentLeader(client, partition2))
+    assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
+    assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
 
     // Noop election
     var electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
     var exception = electResult.partitions.get.get(partition1).get
     assertEquals(classOf[ElectionNotNeededException], exception.getClass)
     assertEquals("Leader election not needed for topic partition", exception.getMessage)
-    assertEquals(Some(0), currentLeader(client, partition1))
+    assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
 
     // Noop election with null partitions
     electResult = client.electLeaders(ElectionType.PREFERRED, null)
     assertTrue(electResult.partitions.get.isEmpty)
-    assertEquals(Some(0), currentLeader(client, partition1))
-    assertEquals(Some(0), currentLeader(client, partition2))
+    assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
+    assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
 
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
@@ -1324,17 +1324,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
     assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
-    waitForLeaderToBecome(client, partition1, Some(1))
+    TestUtils.waitForLeaderToBecome(client, partition1, Some(1))
 
     // topic 2 unchanged
     assertFalse(electResult.partitions.get.containsKey(partition2))
-    assertEquals(Some(0), currentLeader(client, partition2))
+    assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
 
     // meaningful election with null partitions
     electResult = client.electLeaders(ElectionType.PREFERRED, null)
     assertEquals(Set(partition2), electResult.partitions.get.keySet.asScala)
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
-    waitForLeaderToBecome(client, partition2, Some(1))
+    TestUtils.waitForLeaderToBecome(client, partition2, Some(1))
 
     // unknown topic
     val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
@@ -1343,8 +1343,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     exception = electResult.partitions.get.get(unknownPartition).get
     assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
     assertEquals("The partition does not exist.", exception.getMessage)
-    assertEquals(Some(1), currentLeader(client, partition1))
-    assertEquals(Some(1), currentLeader(client, partition2))
+    assertEquals(Some(1), TestUtils.currentLeader(client, partition1))
+    assertEquals(Some(1), TestUtils.currentLeader(client, partition2))
 
     // Now change the preferred leader to 2
     changePreferredLeader(prefer2)
@@ -1352,8 +1352,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     // mixed results
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(unknownPartition, partition1).asJava)
     assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get.keySet)
-    waitForLeaderToBecome(client, partition1, Some(2))
-    assertEquals(Some(1), currentLeader(client, partition2))
+    TestUtils.waitForLeaderToBecome(client, partition1, Some(2))
+    assertEquals(Some(1), TestUtils.currentLeader(client, partition2))
     exception = electResult.partitions.get.get(unknownPartition).get
     assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
     assertEquals("The partition does not exist.", exception.getMessage)
@@ -1362,13 +1362,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition2).asJava)
     assertEquals(Set(partition2).asJava, electResult.partitions.get.keySet)
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
-    waitForLeaderToBecome(client, partition2, Some(2))
+    TestUtils.waitForLeaderToBecome(client, partition2, Some(2))
 
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
     // but shut it down...
     servers(1).shutdown()
-    waitForBrokerOutOfIsr(client, Set(partition1, partition2), 1)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1))
 
     // ... now what happens if we try to elect the preferred leader and it's down?
     val shortTimeout = new ElectLeadersOptions().timeoutMs(10000)
@@ -1378,7 +1378,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
     assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
       "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy
PreferredReplicaPartitionLeaderElectionStrategy"))
-    assertEquals(Some(2), currentLeader(client, partition1))
+    assertEquals(Some(2), TestUtils.currentLeader(client, partition1))
 
     // preferred leader unavailable with null argument
     electResult = client.electLeaders(ElectionType.PREFERRED, null, shortTimeout)
@@ -1393,8 +1393,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
       "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy
PreferredReplicaPartitionLeaderElectionStrategy"))
 
-    assertEquals(Some(2), currentLeader(client, partition1))
-    assertEquals(Some(2), currentLeader(client, partition2))
+    assertEquals(Some(2), TestUtils.currentLeader(client, partition1))
+    assertEquals(Some(2), TestUtils.currentLeader(client, partition2))
   }
 
   @Test
@@ -1409,17 +1409,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     val partition1 = new TopicPartition("unclean-test-topic-1", 0)
     TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition
-> assignment1), servers)
 
-    waitForLeaderToBecome(client, partition1, Option(broker1))
+    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
 
     servers(broker2).shutdown()
-    waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     servers(broker1).shutdown()
-    waitForLeaderToBecome(client, partition1, None)
+    TestUtils.waitForLeaderToBecome(client, partition1, None)
     servers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
-    assertEquals(Option(broker2), currentLeader(client, partition1))
+    assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
   }
 
   @Test
@@ -1443,21 +1443,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    waitForLeaderToBecome(client, partition1, Option(broker1))
-    waitForLeaderToBecome(client, partition2, Option(broker1))
+    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
+    TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
 
     servers(broker2).shutdown()
-    waitForBrokerOutOfIsr(client, Set(partition1, partition2), broker2)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2))
     servers(broker1).shutdown()
-    waitForLeaderToBecome(client, partition1, None)
-    waitForLeaderToBecome(client, partition2, None)
+    TestUtils.waitForLeaderToBecome(client, partition1, None)
+    TestUtils.waitForLeaderToBecome(client, partition2, None)
     servers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
-    assertEquals(Option(broker2), currentLeader(client, partition1))
-    assertEquals(Option(broker2), currentLeader(client, partition2))
+    assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
+    assertEquals(Option(broker2), TestUtils.currentLeader(client, partition2))
   }
 
   @Test
@@ -1482,21 +1482,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    waitForLeaderToBecome(client, partition1, Option(broker1))
-    waitForLeaderToBecome(client, partition2, Option(broker1))
+    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
+    TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
 
     servers(broker2).shutdown()
-    waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     servers(broker1).shutdown()
-    waitForLeaderToBecome(client, partition1, None)
-    waitForLeaderToBecome(client, partition2, Some(broker3))
+    TestUtils.waitForLeaderToBecome(client, partition1, None)
+    TestUtils.waitForLeaderToBecome(client, partition2, Some(broker3))
     servers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
     assertFalse(electResult.partitions.get.containsKey(partition2))
-    assertEquals(Option(broker2), currentLeader(client, partition1))
-    assertEquals(Option(broker3), currentLeader(client, partition2))
+    assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
+    assertEquals(Option(broker3), TestUtils.currentLeader(client, partition2))
   }
 
   @Test
@@ -1519,7 +1519,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    waitForLeaderToBecome(client, new TopicPartition(topic, 0), Option(broker1))
+    TestUtils.waitForLeaderToBecome(client, new TopicPartition(topic, 0), Option(broker1))
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(unknownPartition, unknownTopic).asJava)
     assertTrue(electResult.partitions.get.get(unknownPartition).get.isInstanceOf[UnknownTopicOrPartitionException])
@@ -1545,12 +1545,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    waitForLeaderToBecome(client, partition1, Option(broker1))
+    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
 
     servers(broker2).shutdown()
-    waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     servers(broker1).shutdown()
-    waitForLeaderToBecome(client, partition1, None)
+    TestUtils.waitForLeaderToBecome(client, partition1, None)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
     assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
@@ -1575,10 +1575,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    waitForLeaderToBecome(client, partition1, Option(broker1))
+    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
 
     servers(broker1).shutdown()
-    waitForLeaderToBecome(client, partition1, Some(broker2))
+    TestUtils.waitForLeaderToBecome(client, partition1, Some(broker2))
     servers(broker1).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
@@ -1607,21 +1607,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    waitForLeaderToBecome(client, partition1, Option(broker1))
-    waitForLeaderToBecome(client, partition2, Option(broker1))
+    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
+    TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
 
     servers(broker2).shutdown()
-    waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     servers(broker1).shutdown()
-    waitForLeaderToBecome(client, partition1, None)
-    waitForLeaderToBecome(client, partition2, Some(broker3))
+    TestUtils.waitForLeaderToBecome(client, partition1, None)
+    TestUtils.waitForLeaderToBecome(client, partition2, Some(broker3))
     servers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
     assertTrue(electResult.partitions.get.get(partition2).get.isInstanceOf[ElectionNotNeededException])
-    assertEquals(Option(broker2), currentLeader(client, partition1))
-    assertEquals(Option(broker3), currentLeader(client, partition2))
+    assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
+    assertEquals(Option(broker3), TestUtils.currentLeader(client, partition2))
   }
 
   @Test
@@ -1959,35 +1959,4 @@ object AdminClientIntegrationTest {
 
     assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
   }
-
-  def currentLeader(client: AdminClient, topicPartition: TopicPartition): Option[Int] = {
-    Option(
-      client
-        .describeTopics(asList(topicPartition.topic))
-        .all
-        .get
-        .get(topicPartition.topic)
-        .partitions
-        .get(topicPartition.partition)
-        .leader
-    ).map(_.id)
-  }
-
-  def waitForLeaderToBecome(client: AdminClient, topicPartition: TopicPartition, leader:
Option[Int]): Unit = {
-    TestUtils.waitUntilTrue(
-      () => currentLeader(client, topicPartition) == leader,
-      s"Expected leader to become $leader", 10000
-    )
-  }
-
-  def waitForBrokerOutOfIsr(client: AdminClient, partitions: Set[TopicPartition], brokerId:
Int): Unit = {
-    TestUtils.waitUntilTrue(
-      () => {
-        val description = client.describeTopics(partitions.map(_.topic).asJava).all.get.asScala
-        val isr = description.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
-        isr.forall(_.id != brokerId)
-      },
-      s"Expect broker $brokerId to no longer be in any ISR for $partitions"
-    )
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index ae78fa4..8b4d84f 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -79,12 +79,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
       val topicPartition = new TopicPartition(topic, partition)
 
-      waitForLeaderToBecome(client, topicPartition, Option(broker2))
+      TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
 
       servers(broker3).shutdown()
-      waitForBrokerOutOfIsr(client, Set(topicPartition), broker3)
+      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
       servers(broker2).shutdown()
-      waitForLeaderToBecome(client, topicPartition, None)
+      TestUtils.waitForLeaderToBecome(client, topicPartition, None)
       servers(broker3).startup()
 
       LeaderElectionCommand.main(
@@ -95,7 +95,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
         )
       )
 
-      assertEquals(Option(broker3), currentLeader(client, topicPartition))
+      assertEquals(Option(broker3), TestUtils.currentLeader(client, topicPartition))
     }
   }
 
@@ -110,12 +110,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
       val topicPartition = new TopicPartition(topic, partition)
 
-      waitForLeaderToBecome(client, topicPartition, Option(broker2))
+      TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
 
       servers(broker3).shutdown()
-      waitForBrokerOutOfIsr(client, Set(topicPartition), broker3)
+      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
       servers(broker2).shutdown()
-      waitForLeaderToBecome(client, topicPartition, None)
+      TestUtils.waitForLeaderToBecome(client, topicPartition, None)
       servers(broker3).startup()
 
       LeaderElectionCommand.main(
@@ -127,7 +127,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
         )
       )
 
-      assertEquals(Option(broker3), currentLeader(client, topicPartition))
+      assertEquals(Option(broker3), TestUtils.currentLeader(client, topicPartition))
     }
   }
 
@@ -142,12 +142,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
       val topicPartition = new TopicPartition(topic, partition)
 
-      waitForLeaderToBecome(client, topicPartition, Option(broker2))
+      TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
 
       servers(broker3).shutdown()
-      waitForBrokerOutOfIsr(client, Set(topicPartition), broker3)
+      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
       servers(broker2).shutdown()
-      waitForLeaderToBecome(client, topicPartition, None)
+      TestUtils.waitForLeaderToBecome(client, topicPartition, None)
       servers(broker3).startup()
 
       val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
@@ -160,7 +160,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
         )
       )
 
-      assertEquals(Option(broker3), currentLeader(client, topicPartition))
+      assertEquals(Option(broker3), TestUtils.currentLeader(client, topicPartition))
     }
   }
 
@@ -175,12 +175,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
       val topicPartition = new TopicPartition(topic, partition)
 
-      waitForLeaderToBecome(client, topicPartition, Option(broker2))
+      TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
 
       servers(broker2).shutdown()
-      waitForLeaderToBecome(client, topicPartition, Some(broker3))
+      TestUtils.waitForLeaderToBecome(client, topicPartition, Some(broker3))
       servers(broker2).startup()
-      waitForBrokerInIsr(client, Set(topicPartition), broker2)
+      TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
 
       LeaderElectionCommand.main(
         Array(
@@ -190,7 +190,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
         )
       )
 
-      assertEquals(Option(broker2), currentLeader(client, topicPartition))
+      assertEquals(Option(broker2), TestUtils.currentLeader(client, topicPartition))
     }
   }
 
@@ -319,48 +319,6 @@ object LeaderElectionCommandTest {
     }.headOption.mkString(",")
   }
 
-  def currentLeader(client: JAdminClient, topicPartition: TopicPartition): Option[Int] =
{
-    Option(
-      client
-        .describeTopics(List(topicPartition.topic).asJava)
-        .all
-        .get
-        .get(topicPartition.topic)
-        .partitions
-        .get(topicPartition.partition)
-        .leader
-    ).map(_.id)
-  }
-
-  def waitForLeaderToBecome(client: JAdminClient, topicPartition: TopicPartition, leader:
Option[Int]): Unit = {
-    TestUtils.waitUntilTrue(
-      () => currentLeader(client, topicPartition) == leader,
-      s"Expected leader to become $leader", 10000
-    )
-  }
-
-  def waitForBrokerOutOfIsr(client: JAdminClient, partitions: Set[TopicPartition], brokerId:
Int): Unit = {
-    TestUtils.waitUntilTrue(
-      () => {
-        val description = client.describeTopics(partitions.map(_.topic).asJava).all.get.asScala
-        val isr = description.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
-        isr.forall(_.id != brokerId)
-      },
-      s"Expect broker $brokerId to no longer be in any ISR for $partitions"
-    )
-  }
-
-  def waitForBrokerInIsr(client: JAdminClient, partitions: Set[TopicPartition], brokerId:
Int): Unit = {
-    TestUtils.waitUntilTrue(
-      () => {
-        val description = client.describeTopics(partitions.map(_.topic).asJava).all.get.asScala
-        val isr = description.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
-        isr.exists(_.id == brokerId)
-      },
-      s"Expect broker $brokerId to no longer be in any ISR for $partitions"
-    )
-  }
-
   def tempTopicPartitionFile(partitions: Set[TopicPartition]): Path = {
     val file = File.createTempFile("leader-election-command", ".json")
     file.deleteOnExit()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d97f6c1..24c7e19 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -24,7 +24,9 @@ import java.nio.charset.{Charset, StandardCharsets}
 import java.nio.file.{Files, StandardOpenOption}
 import java.security.cert.X509Certificate
 import java.time.Duration
-import java.util.{Collections, Properties}
+import java.util.Arrays
+import java.util.Collections
+import java.util.Properties
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
 
 import javax.net.ssl.X509TrustManager
@@ -1111,7 +1113,7 @@ object TestUtils extends Logging {
     waitUntilTrue(() => servers.forall(server =>
       server.config.logDirs.forall { logDir =>
         topicPartitions.forall { tp =>
-          !java.util.Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryName
=>
+          !Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryName
=>
             partitionDirectoryName.startsWith(tp.topic + "-" + tp.partition) &&
               partitionDirectoryName.endsWith(Log.DeleteDirSuffix)
           }
@@ -1460,6 +1462,58 @@ object TestUtils extends Logging {
     adminClient.alterConfigs(configs)
   }
 
+  def currentLeader(client: AdminClient, topicPartition: TopicPartition): Option[Int] = {
+    Option(
+      client
+        .describeTopics(Arrays.asList(topicPartition.topic))
+        .all
+        .get
+        .get(topicPartition.topic)
+        .partitions
+        .get(topicPartition.partition)
+        .leader
+    ).map(_.id)
+  }
+
+  def waitForLeaderToBecome(client: AdminClient, topicPartition: TopicPartition, leader:
Option[Int]): Unit = {
+    TestUtils.waitUntilTrue(
+      () => currentLeader(client, topicPartition) == leader,
+      s"Expected leader to become $leader", 10000
+    )
+  }
+
+  def waitForBrokersOutOfIsr(client: AdminClient, partition: Set[TopicPartition], brokerIds:
Set[Int]): Unit = {
+    TestUtils.waitUntilTrue(
+      () => {
+        val description = client.describeTopics(partition.map(_.topic).asJava).all.get.asScala
+        val isr = description
+          .values
+          .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
+          .map(_.id)
+          .toSet
+
+        brokerIds.intersect(isr).isEmpty
+      },
+      s"Expected brokers $brokerIds to no longer in the ISR for $partition"
+    )
+  }
+
+  def waitForBrokersInIsr(client: AdminClient, partition: TopicPartition, brokerIds: Set[Int]):
Unit = {
+    TestUtils.waitUntilTrue(
+      () => {
+        val description = client.describeTopics(Set(partition.topic).asJava).all.get.asScala
+        val isr = description
+          .values
+          .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
+          .map(_.id)
+          .toSet
+
+        brokerIds.subsetOf(isr)
+      },
+      s"Expected brokers $brokerIds to be in the ISR for $partition"
+    )
+  }
+
   /**
    * Capture the console output during the execution of the provided function.
    */


Mime
View raw message