kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: MINOR: Fix bug in `waitUntilLeaderIsElectedOrChanged` and simplify result type
Date Mon, 15 May 2017 23:08:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7bb551b4a -> dd9f43140


MINOR: Fix bug in `waitUntilLeaderIsElectedOrChanged` and simplify result type

Also disable a couple of tests that were passing incorrectly until KAFKA-3096 is fixed.

The bug was for the following case:

`leader.isDefined && oldLeaderOpt.isEmpty && newLeaderOpt.isDefined &&
newLeaderOpt.get != leader.get`

We would consider it a successful election even though the new leader was not the expected
leader.

I also changed the result type as we never return `None` (we throw an exception instead).

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3031 from ijuma/fix-wait-until-leader-is-elected


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd9f4314
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd9f4314
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd9f4314

Branch: refs/heads/trunk
Commit: dd9f43140d4a898576d86f6d5b7be1a021de1040
Parents: 7bb551b
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon May 15 16:08:06 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon May 15 16:08:06 2017 -0700

----------------------------------------------------------------------
 .../integration/kafka/api/BaseQuotaTest.scala   |  5 +-
 .../kafka/api/ProducerBounceTest.scala          |  4 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    | 16 ++---
 .../test/scala/unit/kafka/admin/AdminTest.scala |  4 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |  6 +-
 .../integration/UncleanLeaderElectionTest.scala | 12 ++--
 .../unit/kafka/server/FetchRequestTest.scala    |  2 +-
 .../unit/kafka/server/LeaderElectionTest.scala  | 26 ++++----
 .../unit/kafka/server/LogRecoveryTest.scala     | 14 ++--
 .../unit/kafka/server/ProduceRequestTest.scala  |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala | 69 +++++++++++---------
 11 files changed, 80 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 8d879a2..918bb55 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -75,9 +75,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
 
     val numPartitions = 1
     val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, serverCount, servers)
-    leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1)
-    followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else
servers(1)
-    assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader
=> leader.isDefined))
+    leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1)
+    followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 7b65c4f..5fead18 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -67,9 +67,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     val numPartitions = 3
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
-    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers,
topicConfig)
-
-    assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader
=> leader.isDefined))
+    TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig)
 
     val scheduler = new ProducerScheduler()
     scheduler.start

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index e9c5ac5..d08552e 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -87,12 +87,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   def testIncrementPartitions {
     AdminUtils.addPartitions(zkUtils, topic1, 3)
     // wait until leader is elected
-    var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
-    var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2)
+    val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
+    val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2)
     val leader1FromZk = zkUtils.getLeaderForPartition(topic1, 1).get
     val leader2FromZk = zkUtils.getLeaderForPartition(topic1, 2).get
-    assertEquals(leader1.get, leader1FromZk)
-    assertEquals(leader2.get, leader2FromZk)
+    assertEquals(leader1, leader1FromZk)
+    assertEquals(leader2, leader2FromZk)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
@@ -114,12 +114,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   def testManualAssignmentOfReplicas {
     AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3")
     // wait until leader is elected
-    var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
-    var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 2)
+    val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
+    val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 2)
     val leader1FromZk = zkUtils.getLeaderForPartition(topic2, 1).get
     val leader2FromZk = zkUtils.getLeaderForPartition(topic2, 2).get
-    assertEquals(leader1.get, leader1FromZk)
-    assertEquals(leader2.get, leader2FromZk)
+    assertEquals(leader1, leader1FromZk)
+    assertEquals(leader2, leader2FromZk)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index f8c65eb..377501c 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -348,11 +348,11 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
     // broker 2 should be the leader since it was started first
-    val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition,
oldLeaderOpt = None).get
+    val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition,
oldLeaderOpt = None)
     // trigger preferred replica election
     val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkUtils, Set(TopicAndPartition(topic,
partition)))
     preferredReplicaElection.moveLeaderToPreferredReplica()
-    val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition,
oldLeaderOpt = Some(currentLeader)).get
+    val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition,
oldLeaderOpt = Some(currentLeader))
     assertEquals("Preferred replica election failed", preferredReplica, newLeader)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index d9ab85e..15018f5 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -193,8 +193,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // re-create topic on same replicas
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // wait until leader is elected
-    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
-    assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
     // check if all replica logs are created
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created.")
@@ -220,8 +219,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // test the topic path exists
     assertTrue("Topic test mistakenly deleted", zkUtils.pathExists(getTopicPath(topic)))
     // topic test should have a leader
-    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
-    assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 2597e81..25ed480 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -18,7 +18,7 @@
 package kafka.integration
 
 import org.apache.kafka.common.config.ConfigException
-import org.junit.{Test, After, Before}
+import org.junit.{After, Before, Ignore, Test}
 
 import scala.util.Random
 import org.apache.log4j.{Level, Logger}
@@ -115,6 +115,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  @Ignore // Should be re-enabled after KAFKA-3096 is fixed
   def testUncleanLeaderElectionDisabled {
     // unclean leader election is disabled by default
     startBrokers(Seq(configProps1, configProps2))
@@ -142,6 +143,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  @Ignore // Should be re-enabled after KAFKA-3096 is fixed
   def testCleanLeaderElectionDisabledByTopicOverride {
     // enable unclean leader election globally, but disable for our specific test topic
     configProps1.put("unclean.leader.election.enable", "true")
@@ -172,9 +174,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
   def verifyUncleanLeaderElectionEnabled {
     // wait until leader is elected
-    val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
-    assertTrue("Leader should get elected", leaderIdOpt.isDefined)
-    val leaderId = leaderIdOpt.get
+    val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
     debug("Leader for " + topic  + " is elected to be: %s".format(leaderId))
     assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1
|| leaderId == brokerId2)
 
@@ -207,9 +207,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
   def verifyUncleanLeaderElectionDisabled {
     // wait until leader is elected
-    val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
-    assertTrue("Leader should get elected", leaderIdOpt.isDefined)
-    val leaderId = leaderIdOpt.get
+    val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
     debug("Leader for " + topic  + " is elected to be: %s".format(leaderId))
     assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1
|| leaderId == brokerId2)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 73e48af..48b3945 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -214,7 +214,7 @@ class FetchRequestTest extends BaseRequestTest {
     topics.flatMap { topic =>
       val partitionToLeader = createTopic(zkUtils, topic, numPartitions = numPartitions,
replicationFactor = 2,
         servers = servers, topicConfig = topicConfig)
-      partitionToLeader.map { case (partition, leader) => new TopicPartition(topic, partition)
-> leader.get }
+      partitionToLeader.map { case (partition, leader) => new TopicPartition(topic, partition)
-> leader }
     }.toMap
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index aa243be..37e0966 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -75,22 +75,21 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
 
     val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
     debug("leader Epoch: " + leaderEpoch1)
-    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
-    assertTrue("Leader should get elected", leader1.isDefined)
+    debug("Leader is elected to be: %s".format(leader1))
     // NOTE: this is to avoid transient test failures
-    assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1)
== 1))
+    assertTrue("Leader could be broker 0 or broker 1", leader1 == 0 || leader1 == 1)
     assertEquals("First epoch value should be 0", 0, leaderEpoch1)
 
     // kill the server hosting the preferred replica
     servers.last.shutdown()
     // check if leader moves to the other server
     val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId,
-                                                    oldLeaderOpt = if(leader1.get == 0) None
else leader1)
+                                                    oldLeaderOpt = if (leader1 == 0) None
else Some(leader1))
     val leaderEpoch2 = zkUtils.getEpochForPartition(topic, partitionId)
-    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
+    debug("Leader is elected to be: %s".format(leader1))
     debug("leader Epoch: " + leaderEpoch2)
-    assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
-    if(leader1.get == leader2.get)
+    assertEquals("Leader must move to broker 0", 0, leader2)
+    if (leader1 == leader2)
       assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2)
     else
       assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1,
leaderEpoch2)
@@ -99,12 +98,12 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     servers.head.shutdown()
     Thread.sleep(zookeeper.tickTime)
     val leader3 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId,
-                                                    oldLeaderOpt = if(leader2.get == 1) None
else leader2)
+                                                    oldLeaderOpt = if (leader2 == 1) None
else Some(leader2))
     val leaderEpoch3 = zkUtils.getEpochForPartition(topic, partitionId)
     debug("leader Epoch: " + leaderEpoch3)
-    debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))
-    assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1))
-    if(leader2.get == leader3.get)
+    debug("Leader is elected to be: %s".format(leader3))
+    assertEquals("Leader must return to 1", 1, leader3)
+    if (leader2 == leader3)
       assertEquals("Second epoch value should be " + leaderEpoch2, leaderEpoch2, leaderEpoch3)
     else
       assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1,
leaderEpoch3)
@@ -121,10 +120,9 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
 
     val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
     debug("leader Epoch: " + leaderEpoch1)
-    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
-    assertTrue("Leader should get elected", leader1.isDefined)
+    debug("Leader is elected to be: %s".format(leader1))
     // NOTE: this is to avoid transient test failures
-    assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1)
== 1))
+    assertTrue("Leader could be broker 0 or broker 1", leader1 == 0 || leader1 == 1)
     assertEquals("First epoch value should be 0", 0, leaderEpoch1)
 
     // start another controller

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 0ecc3c7..a5f0dba 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -130,8 +130,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
 
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt
= leader)
-    assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+    leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt
= Some(leader))
+    assertEquals("Leader must move to broker 1", 1, leader)
 
     // bring the preferred replica back
     server1.startup()
@@ -140,7 +140,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
     leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
     assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it
can move to broker 0",
-      leader.isDefined && (leader.get == 0 || leader.get == 1))
+      leader == 0 || leader == 1)
 
     assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
     // since server 2 was never shut down, the hw value of 30 is probably not checkpointed
to disk yet
@@ -149,9 +149,9 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
     server2.startup()
     updateProducer()
-    leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt
= leader)
+    leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt
= Some(leader))
     assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it
can move to broker 1",
-      leader.isDefined && (leader.get == 0 || leader.get == 1))
+      leader == 0 || leader == 1)
 
     sendMessages(1)
     hw += 1
@@ -202,8 +202,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     server2.startup()
     updateProducer()
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt
= leader)
-    assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+    leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt
= Some(leader))
+    assertEquals("Leader must move to broker 1", 1, leader)
 
     assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 2f16719..189f57c 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -63,7 +63,7 @@ class ProduceRequestTest extends BaseRequestTest {
   private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) = {
     val partitionToLeader = TestUtils.createTopic(zkUtils, topic, 3, 2, servers)
     partitionToLeader.collectFirst {
-      case (partition, Some(leader)) if leader != -1 => (partition, leader)
+      case (partition, leader) if leader != -1 => (partition, leader)
     }.getOrElse(fail(s"No leader elected for topic $topic"))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 012fdfd..6bee18d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -258,7 +258,7 @@ object TestUtils extends Logging {
                   numPartitions: Int = 1,
                   replicationFactor: Int = 1,
                   servers: Seq[KafkaServer],
-                  topicConfig: Properties = new Properties) : scala.collection.immutable.Map[Int,
Option[Int]] = {
+                  topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int,
Int] = {
     // create topic
     AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)
     // wait until the update metadata request for new topic reaches all servers
@@ -274,7 +274,7 @@ object TestUtils extends Logging {
    * Return the leader for each partition.
    */
   def createTopic(zkUtils: ZkUtils, topic: String, partitionReplicaAssignment: collection.Map[Int,
Seq[Int]],
-                  servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]]
= {
+                  servers: Seq[KafkaServer]): scala.collection.immutable.Map[Int, Int] =
{
     // create topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaAssignment)
     // wait until the update metadata request for new topic reaches all servers
@@ -744,46 +744,55 @@ object TestUtils extends Logging {
    *  If oldLeaderOpt is defined, it waits until the new leader is different from the old
leader.
    *  If newLeaderOpt is defined, it waits until the new leader becomes the expected new
leader.
    *
-   * @return The new leader or assertion failure if timeout is reached.
+   * @return The new leader (note that negative values are used to indicate conditions like
NoLeader and
+   *         LeaderDuringDelete).
+   * @throws AssertionError if the expected condition is not true within the timeout.
    */
-  def waitUntilLeaderIsElectedOrChanged(zkUtils: ZkUtils, topic: String, partition: Int,
-                                        timeoutMs: Long = 30000,
-                                        oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int]
= None): Option[Int] = {
+  def waitUntilLeaderIsElectedOrChanged(zkUtils: ZkUtils, topic: String, partition: Int,
timeoutMs: Long = 30000L,
+                                        oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int]
= None): Int = {
     require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both
the old and the new leader")
     val startTime = System.currentTimeMillis()
-    var isLeaderElectedOrChanged = false
+    val topicPartition = new TopicPartition(topic, partition)
 
-    trace("Waiting for leader to be elected or changed for partition [%s,%d], older leader
is %s, new leader is %s"
-          .format(topic, partition, oldLeaderOpt, newLeaderOpt))
+    trace(s"Waiting for leader to be elected or changed for partition $topicPartition, old
leader is $oldLeaderOpt, " +
+      s"new leader is $newLeaderOpt")
 
     var leader: Option[Int] = None
-    while (!isLeaderElectedOrChanged && System.currentTimeMillis() < startTime
+ timeoutMs) {
+    var electedOrChangedLeader: Option[Int] = None
+    while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() < startTime
+ timeoutMs) {
       // check if leader is elected
       leader = zkUtils.getLeaderForPartition(topic, partition)
       leader match {
-        case Some(l) =>
-          if (newLeaderOpt.isDefined && newLeaderOpt.get == l) {
-            trace("Expected new leader %d is elected for partition [%s,%d]".format(l, topic,
partition))
-            isLeaderElectedOrChanged = true
-          } else if (oldLeaderOpt.isDefined && oldLeaderOpt.get != l) {
-            trace("Leader for partition [%s,%d] is changed from %d to %d".format(topic, partition,
oldLeaderOpt.get, l))
-            isLeaderElectedOrChanged = true
-          } else if (oldLeaderOpt.isEmpty) {
-            trace("Leader %d is elected for partition [%s,%d]".format(l, topic, partition))
-            isLeaderElectedOrChanged = true
-          } else {
-            trace("Current leader for partition [%s,%d] is %d".format(topic, partition, l))
-          }
+        case Some(l) => (newLeaderOpt, oldLeaderOpt) match {
+          case (Some(newLeader), _) if newLeader == l =>
+            trace(s"Expected new leader $l is elected for partition $topicPartition")
+            electedOrChangedLeader = leader
+          case (_, Some(oldLeader)) if oldLeader != l =>
+            trace(s"Leader for partition $topicPartition is changed from $oldLeader to $l")
+            electedOrChangedLeader = leader
+          case (None, None) =>
+            trace(s"Leader $l is elected for partition $topicPartition")
+            electedOrChangedLeader = leader
+          case _ =>
+            trace(s"Current leader for partition $topicPartition is $l")
+        }
         case None =>
-          trace("Leader for partition [%s,%d] is not elected yet".format(topic, partition))
+          trace(s"Leader for partition $topicPartition is not elected yet")
       }
-      Thread.sleep(timeoutMs.min(100L))
+      Thread.sleep(math.min(timeoutMs, 100L))
+    }
+    electedOrChangedLeader.getOrElse {
+      val errorMessage = (newLeaderOpt, oldLeaderOpt) match {
+        case (Some(newLeader), _) =>
+          s"Timing out after $timeoutMs ms since expected new leader $newLeader was not elected
for partition $topicPartition, leader is $leader"
+        case (_, Some(oldLeader)) =>
+          s"Timing out after $timeoutMs ms since a new leader that is different from $oldLeader
was not elected for partition $topicPartition, " +
+            s"leader is $leader"
+        case _ =>
+          s"Timing out after $timeoutMs ms since a leader was not elected for partition $topicPartition"
+      }
+      fail(errorMessage)
     }
-    if (!isLeaderElectedOrChanged)
-      fail("Timing out after %d ms since leader is not elected or changed for partition [%s,%d]"
-           .format(timeoutMs, topic, partition))
-
-    leader
   }
 
   /**


Mime
View raw message