kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8962; Use least loaded node for AdminClient#describeTopics (#7421)
Date Fri, 18 Oct 2019 06:24:02 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 1be8fb9  KAFKA-8962; Use least loaded node for AdminClient#describeTopics (#7421)
1be8fb9 is described below

commit 1be8fb9e3781533dc2c0d269979ef6a18aa56a6b
Author: Dhruvil Shah <dhruvil@confluent.io>
AuthorDate: Fri Oct 18 02:08:35 2019 -0400

    KAFKA-8962; Use least loaded node for AdminClient#describeTopics (#7421)
    
    Allow routing of `AdminClient#describeTopics` to any broker in the cluster than just the
controller, so that we don't create a hotspot for this API call. `AdminClient#describeTopics`
uses the broker's metadata cache which is asynchronously maintained, so routing to brokers
other than the controller is not expected to have a significant difference in terms of metadata
consistency; all metadata requests are eventually consistent.
    
    This patch also fixes a few flaky test failures.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio <jsancio@gmail.com>,
Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   2 +-
 .../scala/kafka/controller/KafkaController.scala   |   2 +-
 .../kafka/api/AdminClientIntegrationTest.scala     | 153 ++++++++++++---------
 .../api/SaslSslAdminClientIntegrationTest.scala    |  25 +++-
 .../kafka/admin/LeaderElectionCommandTest.scala    |  26 ++--
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  36 ++---
 6 files changed, 142 insertions(+), 102 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d4958f2..0850ced 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1537,7 +1537,7 @@ public class KafkaAdminClient extends AdminClient {
         }
         final long now = time.milliseconds();
         Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
-            new ControllerNodeProvider()) {
+            new LeastLoadedNodeProvider()) {
 
             private boolean supportsDisablingTopicCreation = true;
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 91c4534..0dd2ec7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -790,7 +790,7 @@ class KafkaController(val config: KafkaConfig,
     electionType: ElectionType,
     electionTrigger: ElectionTrigger
   ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
-    info(s"Starting replica leader election ($electionType) for partitions ${partitions.mkString(",")}
triggerd by $electionTrigger")
+    info(s"Starting replica leader election ($electionType) for partitions ${partitions.mkString(",")}
triggered by $electionTrigger")
     try {
       val strategy = electionType match {
         case ElectionType.PREFERRED => PreferredReplicaPartitionLeaderElectionStrategy
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index f2e7af6..6ff4f0e 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -37,10 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.ConsumerGroupState
-import org.apache.kafka.common.ElectionType
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.TopicPartitionReplica
+import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicPartition, TopicPartitionInfo,
TopicPartitionReplica}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
 import org.apache.kafka.common.errors._
@@ -296,15 +293,14 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     waitForTopics(client, expectedPresent = Seq(topic), expectedMissing = List())
 
     // without includeAuthorizedOperations flag
-    var topicResult = client.describeTopics(Seq(topic).asJava).values
-    assertEquals(Set().asJava, topicResult.get(topic).get().authorizedOperations())
+    var topicResult = getTopicMetadata(client, topic)
+    assertEquals(Set().asJava, topicResult.authorizedOperations)
 
     //with includeAuthorizedOperations flag
-    topicResult = client.describeTopics(Seq(topic).asJava,
-      new DescribeTopicsOptions().includeAuthorizedOperations(true)).values
+    topicResult = getTopicMetadata(client, topic, new DescribeTopicsOptions().includeAuthorizedOperations(true))
     expectedOperations = Topic.supportedOperations
       .map(operation => operation.toJava).asJava
-    assertEquals(expectedOperations, topicResult.get(topic).get().authorizedOperations())
+    assertEquals(expectedOperations, topicResult.authorizedOperations)
   }
 
   def configuredClusterPermissions() : Set[AclOperation] = {
@@ -559,17 +555,19 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     createTopic(topic2, numPartitions = 1, replicationFactor = 2)
 
     // assert that both the topics have 1 partition
-    assertEquals(1, client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size)
-    assertEquals(1, client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions.size)
+    val topic1_metadata = getTopicMetadata(client, topic1)
+    val topic2_metadata = getTopicMetadata(client, topic2)
+    assertEquals(1, topic1_metadata.partitions.size)
+    assertEquals(1, topic2_metadata.partitions.size)
 
     val validateOnly = new CreatePartitionsOptions().validateOnly(true)
     val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false)
 
-    def partitions(topic: String) =
-      client.describeTopics(Set(topic).asJava).values.get(topic).get.partitions
+    def partitions(topic: String, expectedNumPartitionsOpt: Option[Int] = None): util.List[TopicPartitionInfo]
= {
+      getTopicMetadata(client, topic, expectedNumPartitionsOpt = expectedNumPartitionsOpt).partitions
+    }
 
-    def numPartitions(topic: String) =
-      partitions(topic).size
+    def numPartitions(topic: String): Int = partitions(topic).size
 
     // validateOnly: try creating a new partition (no assignments), to bring the total to
3 partitions
     var alterResult = client.createPartitions(Map(topic1 ->
@@ -581,7 +579,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging
{
     alterResult = client.createPartitions(Map(topic1 ->
       NewPartitions.increaseTo(3)).asJava, actuallyDoIt)
     altered = alterResult.values.get(topic1).get
-    assertEquals(3, numPartitions(topic1))
+    TestUtils.waitUntilTrue(() => numPartitions(topic1) == 3, "Timed out waiting for new
partitions to appear")
 
     // validateOnly: now try creating a new partition (with assignments), to bring the total
to 3 partitions
     val newPartition2Assignments = asList[util.List[Integer]](asList(0, 1), asList(1, 2))
@@ -594,7 +592,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging
{
     alterResult = client.createPartitions(Map(topic2 ->
       NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, actuallyDoIt)
     altered = alterResult.values.get(topic2).get
-    val actualPartitions2 = partitions(topic2)
+    val actualPartitions2 = partitions(topic2, expectedNumPartitionsOpt = Some(3))
     assertEquals(3, actualPartitions2.size)
     assertEquals(Seq(0, 1), actualPartitions2.get(1).replicas.asScala.map(_.id).toList)
     assertEquals(Seq(1, 2), actualPartitions2.get(2).replicas.asScala.map(_.id).toList)
@@ -782,7 +780,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging
{
       topic2 -> NewPartitions.increaseTo(2)).asJava, actuallyDoIt)
     // assert that the topic1 now has 4 partitions
     altered = alterResult.values.get(topic1).get
-    assertEquals(4, numPartitions(topic1))
+    TestUtils.waitUntilTrue(() => numPartitions(topic1) == 4, "Timed out waiting for new
partitions to appear")
     try {
       altered = alterResult.values.get(topic2).get
     } catch {
@@ -1452,15 +1450,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0)
     TestUtils.createTopic(zkClient, partition2.topic, Map[Int, Seq[Int]](partition2.partition
-> prefer0), servers)
 
-    def preferredLeader(topicPartition: TopicPartition) =
-      client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
-        get.partitions.get(topicPartition.partition).replicas.get(0).id
+    def preferredLeader(topicPartition: TopicPartition): Int = {
+      val partitionMetadata = getTopicMetadata(client, topicPartition.topic).partitions.get(topicPartition.partition)
+      val preferredLeaderMetadata = partitionMetadata.replicas.get(0)
+      preferredLeaderMetadata.id
+    }
 
     /** Changes the <i>preferred</i> leader without changing the <i>current</i>
leader. */
     def changePreferredLeader(newAssignment: Seq[Int]) = {
       val preferred = newAssignment.head
-      val prior1 = TestUtils.currentLeader(client, partition1).get
-      val prior2 = TestUtils.currentLeader(client, partition2).get
+      val prior1 = zkClient.getLeaderForPartition(partition1).get
+      val prior2 = zkClient.getLeaderForPartition(partition2).get
 
       var m = Map.empty[TopicPartition, Seq[Int]]
 
@@ -1475,26 +1475,26 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
         s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)}
and ${preferredLeader(partition2)}",
         10000)
       // Check the leader hasn't moved
-      assertEquals(Some(prior1), TestUtils.currentLeader(client, partition1))
-      assertEquals(Some(prior2), TestUtils.currentLeader(client, partition2))
+      TestUtils.assertLeader(client, partition1, prior1)
+      TestUtils.assertLeader(client, partition2, prior2)
     }
 
     // Check current leaders are 0
-    assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
-    assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
+    TestUtils.assertLeader(client, partition1, 0)
+    TestUtils.assertLeader(client, partition2, 0)
 
     // Noop election
     var electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
     var exception = electResult.partitions.get.get(partition1).get
     assertEquals(classOf[ElectionNotNeededException], exception.getClass)
     assertEquals("Leader election not needed for topic partition", exception.getMessage)
-    assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
+    TestUtils.assertLeader(client, partition1, 0)
 
     // Noop election with null partitions
     electResult = client.electLeaders(ElectionType.PREFERRED, null)
     assertTrue(electResult.partitions.get.isEmpty)
-    assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
-    assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
+    TestUtils.assertLeader(client, partition1, 0)
+    TestUtils.assertLeader(client, partition2, 0)
 
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
@@ -1503,17 +1503,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
     assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
-    TestUtils.waitForLeaderToBecome(client, partition1, Some(1))
+    TestUtils.assertLeader(client, partition1, 1)
 
     // topic 2 unchanged
     assertFalse(electResult.partitions.get.containsKey(partition2))
-    assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
+    TestUtils.assertLeader(client, partition2, 0)
 
     // meaningful election with null partitions
     electResult = client.electLeaders(ElectionType.PREFERRED, null)
     assertEquals(Set(partition2), electResult.partitions.get.keySet.asScala)
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
-    TestUtils.waitForLeaderToBecome(client, partition2, Some(1))
+    TestUtils.assertLeader(client, partition2, 1)
 
     // unknown topic
     val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
@@ -1522,8 +1522,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     exception = electResult.partitions.get.get(unknownPartition).get
     assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
     assertEquals("The partition does not exist.", exception.getMessage)
-    assertEquals(Some(1), TestUtils.currentLeader(client, partition1))
-    assertEquals(Some(1), TestUtils.currentLeader(client, partition2))
+    TestUtils.assertLeader(client, partition1, 1)
+    TestUtils.assertLeader(client, partition2, 1)
 
     // Now change the preferred leader to 2
     changePreferredLeader(prefer2)
@@ -1531,8 +1531,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     // mixed results
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(unknownPartition, partition1).asJava)
     assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get.keySet)
-    TestUtils.waitForLeaderToBecome(client, partition1, Some(2))
-    assertEquals(Some(1), TestUtils.currentLeader(client, partition2))
+    TestUtils.assertLeader(client, partition1, 2)
+    TestUtils.assertLeader(client, partition2, 1)
     exception = electResult.partitions.get.get(unknownPartition).get
     assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
     assertEquals("The partition does not exist.", exception.getMessage)
@@ -1541,7 +1541,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition2).asJava)
     assertEquals(Set(partition2).asJava, electResult.partitions.get.keySet)
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
-    TestUtils.waitForLeaderToBecome(client, partition2, Some(2))
+    TestUtils.assertLeader(client, partition2, 2)
 
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
@@ -1557,7 +1557,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
     assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
       "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy
PreferredReplicaPartitionLeaderElectionStrategy"))
-    assertEquals(Some(2), TestUtils.currentLeader(client, partition1))
+    TestUtils.assertLeader(client, partition1, 2)
 
     // preferred leader unavailable with null argument
     electResult = client.electLeaders(ElectionType.PREFERRED, null, shortTimeout)
@@ -1572,8 +1572,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
       "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy
PreferredReplicaPartitionLeaderElectionStrategy"))
 
-    assertEquals(Some(2), TestUtils.currentLeader(client, partition1))
-    assertEquals(Some(2), TestUtils.currentLeader(client, partition2))
+    TestUtils.assertLeader(client, partition1, 2)
+    TestUtils.assertLeader(client, partition2, 2)
   }
 
   @Test
@@ -1588,17 +1588,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     val partition1 = new TopicPartition("unclean-test-topic-1", 0)
     TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition
-> assignment1), servers)
 
-    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
+    TestUtils.assertLeader(client, partition1, broker1)
 
     servers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     servers(broker1).shutdown()
-    TestUtils.waitForLeaderToBecome(client, partition1, None)
+    TestUtils.assertNoLeader(client, partition1)
     servers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
-    assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
+    TestUtils.assertLeader(client, partition1, broker2)
   }
 
   @Test
@@ -1622,21 +1622,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
-    TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
+    TestUtils.assertLeader(client, partition1, broker1)
+    TestUtils.assertLeader(client, partition2, broker1)
 
     servers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2))
     servers(broker1).shutdown()
-    TestUtils.waitForLeaderToBecome(client, partition1, None)
-    TestUtils.waitForLeaderToBecome(client, partition2, None)
+    TestUtils.assertNoLeader(client, partition1)
+    TestUtils.assertNoLeader(client, partition2)
     servers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
-    assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
-    assertEquals(Option(broker2), TestUtils.currentLeader(client, partition2))
+    TestUtils.assertLeader(client, partition1, broker2)
+    TestUtils.assertLeader(client, partition2, broker2)
   }
 
   @Test
@@ -1661,21 +1661,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
-    TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
+    TestUtils.assertLeader(client, partition1, broker1)
+    TestUtils.assertLeader(client, partition2, broker1)
 
     servers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     servers(broker1).shutdown()
-    TestUtils.waitForLeaderToBecome(client, partition1, None)
-    TestUtils.waitForLeaderToBecome(client, partition2, Some(broker3))
+    TestUtils.assertNoLeader(client, partition1)
+    TestUtils.assertLeader(client, partition2, broker3)
     servers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
     assertFalse(electResult.partitions.get.containsKey(partition2))
-    assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
-    assertEquals(Option(broker3), TestUtils.currentLeader(client, partition2))
+    TestUtils.assertLeader(client, partition1, broker2)
+    TestUtils.assertLeader(client, partition2, broker3)
   }
 
   @Test
@@ -1698,7 +1698,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    TestUtils.waitForLeaderToBecome(client, new TopicPartition(topic, 0), Option(broker1))
+    TestUtils.assertLeader(client, new TopicPartition(topic, 0), broker1)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(unknownPartition, unknownTopic).asJava)
     assertTrue(electResult.partitions.get.get(unknownPartition).get.isInstanceOf[UnknownTopicOrPartitionException])
@@ -1724,12 +1724,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
+    TestUtils.assertLeader(client, partition1, broker1)
 
     servers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     servers(broker1).shutdown()
-    TestUtils.waitForLeaderToBecome(client, partition1, None)
+    TestUtils.assertNoLeader(client, partition1)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
     assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
@@ -1754,10 +1754,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
+    TestUtils.assertLeader(client, partition1, broker1)
 
     servers(broker1).shutdown()
-    TestUtils.waitForLeaderToBecome(client, partition1, Some(broker2))
+    TestUtils.assertLeader(client, partition1, broker2)
     servers(broker1).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
@@ -1786,21 +1786,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       servers
     )
 
-    TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
-    TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
+    TestUtils.assertLeader(client, partition1, broker1)
+    TestUtils.assertLeader(client, partition2, broker1)
 
     servers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     servers(broker1).shutdown()
-    TestUtils.waitForLeaderToBecome(client, partition1, None)
-    TestUtils.waitForLeaderToBecome(client, partition2, Some(broker3))
+    TestUtils.assertNoLeader(client, partition1)
+    TestUtils.assertLeader(client, partition2, broker3)
     servers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
     assertTrue(electResult.partitions.get.get(partition2).get.isInstanceOf[ElectionNotNeededException])
-    assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
-    assertEquals(Option(broker3), TestUtils.currentLeader(client, partition2))
+    TestUtils.assertLeader(client, partition1, broker2)
+    TestUtils.assertLeader(client, partition2, broker3)
   }
 
   @Test
@@ -2428,4 +2428,23 @@ object AdminClientIntegrationTest {
 
     assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
   }
+
+  private def getTopicMetadata(client: Admin,
+                               topic: String,
+                               describeOptions: DescribeTopicsOptions = new DescribeTopicsOptions,
+                               expectedNumPartitionsOpt: Option[Int] = None): TopicDescription
= {
+    var result: TopicDescription = null
+
+    TestUtils.waitUntilTrue(() => {
+      val topicResult = client.describeTopics(Set(topic).asJava, describeOptions).values.get(topic)
+      try {
+        result = topicResult.get
+        expectedNumPartitionsOpt.map(_ == result.partitions.size).getOrElse(true)
+      } catch {
+        case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException]
=> false  // metadata may not have propagated yet, so retry
+      }
+    }, s"Timed out waiting for metadata for $topic")
+
+    result
+  }
 }
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 2a8131e..fd4b29f 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -24,7 +24,7 @@ import kafka.utils.TestUtils._
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException,
TopicAuthorizationException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException,
TopicAuthorizationException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter,
ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.junit.Assert.{assertEquals, assertTrue}
@@ -33,6 +33,7 @@ import org.junit.{After, Assert, Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
+import scala.concurrent.ExecutionException
 import scala.util.{Failure, Success, Try}
 
 class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup
{
@@ -448,9 +449,8 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest
with
     validateMetadataAndConfigs(createResult)
     val createResponseConfig = createResult.config(topic1).get().entries.asScala
 
-    val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
-    val describeResponseConfig = client.describeConfigs(List(topicResource).asJava).values.get(topicResource).get().entries.asScala
-    assertEquals(describeResponseConfig.size, createResponseConfig.size)
+    val describeResponseConfig = describeConfigs(topic1)
+    assertEquals(describeResponseConfig.map(_.name).toSet, createResponseConfig.map(_.name).toSet)
     describeResponseConfig.foreach { describeEntry =>
       val name = describeEntry.name
       val createEntry = createResponseConfig.find(_.name == name).get
@@ -461,6 +461,23 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest
with
     }
   }
 
+  private def describeConfigs(topic: String): Iterable[ConfigEntry] = {
+    val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+    var configEntries: Iterable[ConfigEntry] = null
+
+    TestUtils.waitUntilTrue(() => {
+      try {
+        val topicResponse = client.describeConfigs(List(topicResource).asJava).all.get.get(topicResource)
+        configEntries = topicResponse.entries.asScala
+        true
+      } catch {
+        case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException]
=> false
+      }
+    }, "Timed out waiting for describeConfigs")
+
+    configEntries
+  }
+
   private def waitForDescribeAcls(client: Admin, filter: AclBindingFilter, acls: Set[AclBinding]):
Unit = {
     var lastResults: util.Collection[AclBinding] = null
     TestUtils.waitUntilTrue(() => {
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index c30011f..45fb7b9 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -80,12 +80,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
       val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
+      TestUtils.assertLeader(client, topicPartition, broker2)
 
       servers(broker3).shutdown()
       TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
       servers(broker2).shutdown()
-      TestUtils.waitForLeaderToBecome(client, topicPartition, None)
+      TestUtils.assertNoLeader(client, topicPartition)
       servers(broker3).startup()
 
       LeaderElectionCommand.main(
@@ -96,7 +96,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
         )
       )
 
-      assertEquals(Option(broker3), TestUtils.currentLeader(client, topicPartition))
+      TestUtils.assertLeader(client, topicPartition, broker3)
     }
   }
 
@@ -111,12 +111,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
       val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
+      TestUtils.assertLeader(client, topicPartition, broker2)
 
       servers(broker3).shutdown()
       TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
       servers(broker2).shutdown()
-      TestUtils.waitForLeaderToBecome(client, topicPartition, None)
+      TestUtils.assertNoLeader(client, topicPartition)
       servers(broker3).startup()
 
       LeaderElectionCommand.main(
@@ -128,7 +128,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
         )
       )
 
-      assertEquals(Option(broker3), TestUtils.currentLeader(client, topicPartition))
+      TestUtils.assertLeader(client, topicPartition, broker3)
     }
   }
 
@@ -143,12 +143,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
       val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
+      TestUtils.assertLeader(client, topicPartition, broker2)
 
       servers(broker3).shutdown()
       TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
       servers(broker2).shutdown()
-      TestUtils.waitForLeaderToBecome(client, topicPartition, None)
+      TestUtils.assertNoLeader(client, topicPartition)
       servers(broker3).startup()
 
       val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
@@ -161,7 +161,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
         )
       )
 
-      assertEquals(Option(broker3), TestUtils.currentLeader(client, topicPartition))
+      TestUtils.assertLeader(client, topicPartition, broker3)
     }
   }
 
@@ -176,10 +176,10 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
 
       val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
+      TestUtils.assertLeader(client, topicPartition, broker2)
 
       servers(broker2).shutdown()
-      TestUtils.waitForLeaderToBecome(client, topicPartition, Some(broker3))
+      TestUtils.assertLeader(client, topicPartition, broker3)
       servers(broker2).startup()
       TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
 
@@ -191,7 +191,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
         )
       )
 
-      assertEquals(Option(broker2), TestUtils.currentLeader(client, topicPartition))
+      TestUtils.assertLeader(client, topicPartition, broker2)
     }
   }
 
@@ -273,7 +273,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
       LeaderElectionCommand.main(
         Array(
           "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "preferrred"
+          "--election-type", "preferred"
         )
       )
       fail()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 6df853d..2e8afe3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -28,8 +28,8 @@ import java.util.Arrays
 import java.util.Collections
 import java.util.Properties
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-import javax.net.ssl.X509TrustManager
 
+import javax.net.ssl.X509TrustManager
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log._
@@ -49,6 +49,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
Produce
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBindingFilter}
 import org.apache.kafka.common.{KafkaFuture, TopicPartition}
 import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
@@ -1488,24 +1489,27 @@ object TestUtils extends Logging {
     adminClient.alterConfigs(configs)
   }
 
-  def currentLeader(client: Admin, topicPartition: TopicPartition): Option[Int] = {
-    Option(
-      client
-        .describeTopics(Arrays.asList(topicPartition.topic))
-        .all
-        .get
-        .get(topicPartition.topic)
-        .partitions
-        .get(topicPartition.partition)
-        .leader
-    ).map(_.id)
+  def assertLeader(client: Admin, topicPartition: TopicPartition, expectedLeader: Int): Unit
= {
+    waitForLeaderToBecome(client, topicPartition, Some(expectedLeader))
+  }
+
+  def assertNoLeader(client: Admin, topicPartition: TopicPartition): Unit = {
+    waitForLeaderToBecome(client, topicPartition, None)
   }
 
   def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]):
Unit = {
-    TestUtils.waitUntilTrue(
-      () => currentLeader(client, topicPartition) == leader,
-      s"Expected leader to become $leader", 10000
-    )
+    val topic = topicPartition.topic
+    val partition = topicPartition.partition
+
+    TestUtils.waitUntilTrue(() => {
+      try {
+        val topicResult = client.describeTopics(Arrays.asList(topic)).all.get.get(topic)
+        val partitionResult = topicResult.partitions.get(partition)
+        Option(partitionResult.leader).map(_.id) == leader
+      } catch {
+        case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException]
=> false
+      }
+    }, "Timed out waiting for leader metadata")
   }
 
   def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]):
Unit = {


Mime
View raw message