kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1392; all TestUtiles.waitUntilTrue() should throw an exception if the return value is false; patched by Jun Rao; reviewed by Guozhang Wang, Neha Narkhede and Joel Koshy
Date Fri, 25 Apr 2014 20:44:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0bd4c87f9 -> c9bb24f15


kafka-1392; all TestUtiles.waitUntilTrue() should throw an exception if the return value is
false; patched by Jun Rao; reviewed by Guozhang Wang, Neha Narkhede and Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9bb24f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9bb24f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9bb24f1

Branch: refs/heads/trunk
Commit: c9bb24f153e0975f68386cd7818feceb3ebade3e
Parents: 0bd4c87
Author: Jun Rao <junrao@gmail.com>
Authored: Fri Apr 25 13:44:52 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Apr 25 13:44:52 2014 -0700

----------------------------------------------------------------------
 .../test/scala/unit/kafka/admin/AdminTest.scala | 50 ++++++++++++--------
 .../kafka/integration/PrimitiveApiTest.scala    | 25 ++++++----
 .../unit/kafka/network/SocketServerTest.scala   |  7 ++-
 .../unit/kafka/producer/ProducerTest.scala      |  6 ++-
 .../unit/kafka/server/LeaderElectionTest.scala  |  3 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala | 13 ++---
 .../unit/kafka/server/LogRecoveryTest.scala     | 26 +++++-----
 .../unit/kafka/server/ReplicaFetchTest.scala    |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala | 37 +++++++++------
 9 files changed, 100 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9bb24f1/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 3a6c5ff..a8d92f6 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -156,16 +156,18 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
newReplicas,
-      Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
-    }, 1000)
+        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
newReplicas,
+        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+      },
+      "Partition reassignment should complete")
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     // in sync replicas should not have any replica that is not in the new assigned replicas
     checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
     ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas,
servers)
-    assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic,
0) == newReplicas.toSet, 5000))
+    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
+                            "New replicas should exist on brokers")
     servers.foreach(_.shutdown())
   }
 
@@ -185,15 +187,18 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
newReplicas,
-        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
-    }, 1000)
+        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
newReplicas,
+          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+      },
+      "Partition reassignment should complete")
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
     ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas,
servers)
-    assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic,
0) == newReplicas.toSet, 5000))
+    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
+                            "New replicas should exist on brokers")
+
     servers.foreach(_.shutdown())
   }
 
@@ -213,15 +218,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
newReplicas,
-        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
-    }, 2000)
+        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
newReplicas,
+          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+      },
+      "Partition reassignment should complete")
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
     ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas,
servers)
-    assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic,
0) == newReplicas.toSet, 5000))
+    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
+                            "New replicas should exist on brokers")
     servers.foreach(_.shutdown())
   }
 
@@ -256,13 +263,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     reassignPartitionsCommand.reassignPartitions
     // create brokers
     val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new
KafkaConfig(b)))
-    TestUtils.waitUntilTrue(() => checkIfReassignPartitionPathExists(zkClient), 1000)
+
+    // wait until reassignment completes
+    TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient),
+                            "Partition reassignment should complete")
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
     // ensure that there are no under replicated partitions
     ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas,
servers)
-    assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic,
0) == newReplicas.toSet, 5000))
+    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
+                            "New replicas should exist on brokers")
     servers.foreach(_.shutdown())
   }
 
@@ -318,8 +329,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     var activeServers = servers.filter(s => s.config.brokerId != 2)
     try {
       // wait for the update metadata request to trickle to the brokers
-      assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() =>
-        activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size
!= 3), 1000))
+      TestUtils.waitUntilTrue(() =>
+        activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size
!= 3),
+        "Topic test not created after timeout")
       assertEquals(0, partitionsRemaining.size)
       var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition)
       var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9bb24f1/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 6d489ad..97e3b14 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -234,17 +234,24 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with
     }
 
     // wait until the messages are published
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1",
0)).get.logEndOffset == 2 }, 1000)
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2",
0)).get.logEndOffset == 2 }, 1000)
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3",
0)).get.logEndOffset == 2 }, 1000)
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4",
0)).get.logEndOffset == 2 }, 1000)
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1",
0)).get.logEndOffset == 2 },
+                            "Published messages should be in the log")
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2",
0)).get.logEndOffset == 2 },
+                            "Published messages should be in the log")
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3",
0)).get.logEndOffset == 2 },
+                            "Published messages should be in the log")
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4",
0)).get.logEndOffset == 2 },
+                            "Published messages should be in the log")
 
     val replicaId = servers.head.config.brokerId
-    val hwWaitMs = config.replicaHighWatermarkCheckpointIntervalMs
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0,
replicaId).get.highWatermark == 2 }, hwWaitMs)
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0,
replicaId).get.highWatermark == 2 }, hwWaitMs)
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0,
replicaId).get.highWatermark == 2 }, hwWaitMs)
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0,
replicaId).get.highWatermark == 2 }, hwWaitMs)
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0,
replicaId).get.highWatermark == 2 },
+                            "High watermark should equal to log end offset")
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0,
replicaId).get.highWatermark == 2 },
+                            "High watermark should equal to log end offset")
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0,
replicaId).get.highWatermark == 2 },
+                            "High watermark should equal to log end offset")
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0,
replicaId).get.highWatermark == 2 },
+                            "High watermark should equal to log end offset")
 
     // test if the consumer received the messages in the correct order when producer has
enabled request pipelining
     val request = builder.build()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9bb24f1/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index c3b1ac4..75bd41b 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -119,10 +119,9 @@ class SocketServerTest extends JUnitSuite {
     server.requestChannel.sendResponse(new RequestChannel.Response(0, request, null))
 
     // After the response is sent to the client (which is async and may take a bit of time),
the socket key should be available for reads.
-    Assert.assertTrue(
-      TestUtils.waitUntilTrue(
-        () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ)
== SelectionKey.OP_READ },
-        5000))
+    TestUtils.waitUntilTrue(
+      () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ)
== SelectionKey.OP_READ },
+      "Socket key should be available for reads")
   }
 
   @Test(expected = classOf[SocketException])

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9bb24f1/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 7877da8..c3da69d 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -314,8 +314,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
 
       // create topic
       AdminUtils.createTopic(zkClient, "new-topic", 2, 1)
-      assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(()
=>
-        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
zookeeper.tickTime))
+      TestUtils.waitUntilTrue(() =>
+        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
+        "Topic new-topic not created after timeout",
+        waitTime = zookeeper.tickTime)
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0)
     
       producer.send(new KeyedMessage[String, String]("new-topic", "key", null))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9bb24f1/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index b278bb6..25dffcf 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -132,7 +132,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness
{
                                                       staleControllerEpoch, 0, "")
 
     controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
-    TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)
+    TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true,
+                            "Controller epoch should be stale")
     assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
 
     controllerChannelManager.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9bb24f1/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 3fb08e6..9556ed9 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -82,8 +82,8 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     AdminUtils.createTopic(zkClient, topic, 1, 1)
 
     val logManager = server.getLogManager
-    assertTrue("Log for partition [topic,0] should be created",
-      waitUntilTrue(() => logManager.getLog(TopicAndPartition(topic, part)).isDefined,
5000))
+    waitUntilTrue(() => logManager.getLog(TopicAndPartition(topic, part)).isDefined,
+                  "Log for partition [topic,0] should be created")
     val log = logManager.getLog(TopicAndPartition(topic, part)).get
 
     val message = new Message(Integer.toString(42).getBytes())
@@ -94,8 +94,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime,
10)
     assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
 
-    assertTrue("Leader should be elected",
-      waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 5000))
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be
elected")
     val topicAndPartition = TopicAndPartition(topic, part)
     val offsetRequest = OffsetRequest(
       Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)),
@@ -158,8 +157,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now,
10)
     assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
 
-    assertTrue("Leader should be elected",
-      waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 5000))
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be
elected")
     val topicAndPartition = TopicAndPartition(topic, part)
     val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now,
10)), replicaId = 0)
     val consumerOffsets =
@@ -187,8 +185,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     assertEquals(Seq(0L), offsets)
 
-    assertTrue("Leader should be elected",
-      waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 5000))
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be
elected")
     val topicAndPartition = TopicAndPartition(topic, part)
     val offsetRequest =
       OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,
10)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9bb24f1/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 7a0ef6f..1b87acf 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -77,9 +77,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     sendMessages(numMessages.toInt)
 
     // give some time for the follower 1 to record leader HW
-    assertTrue("Failed to update highwatermark for follower after 1000 ms", 
-               TestUtils.waitUntilTrue(() =>
-                 server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages,
10000))
+    TestUtils.waitUntilTrue(() =>
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages,
+      "Failed to update high watermark for follower after timeout")
 
     servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
     producer.close()
@@ -135,8 +135,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     hw += 1
       
     // give some time for follower 1 to record leader HW of 60
-    assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000))
+    TestUtils.waitUntilTrue(() =>
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+      "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
     producer.close()
@@ -161,8 +162,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     sendMessages(20)
     var hw = 20L
     // give some time for follower 1 to record leader HW of 600
-    assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000))
+    TestUtils.waitUntilTrue(() =>
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+      "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
     producer.close()
@@ -191,8 +193,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     var hw = 2L
     
     // allow some time for the follower to get the leader HW
-    assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000))
+    TestUtils.waitUntilTrue(() =>
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+      "Failed to update high watermark for follower after timeout")
     // kill the server hosting the preferred replica
     server1.shutdown()
     server2.shutdown()
@@ -216,8 +219,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     hw += 2
     
     // allow some time for the follower to get the leader HW
-    assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
-      server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000))
+    TestUtils.waitUntilTrue(() =>
+      server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+      "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
     producer.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9bb24f1/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 481a400..faf466b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -71,6 +71,6 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
       }
       result
     }
-    assertTrue("Broker logs should be identical", waitUntilTrue(logsMatch, 6000))
+    waitUntilTrue(logsMatch, "Broker logs should be identical")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9bb24f1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 130b6be..498941d 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -533,15 +533,15 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Wait until the given condition is true or the given wait time ellapses
+   * Wait until the given condition is true or throw an exception if the given wait time
elapses.
    */
-  def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
+  def waitUntilTrue(condition: () => Boolean, msg: String, waitTime: Long = 5000L): Boolean
= {
     val startTime = System.currentTimeMillis()
     while (true) {
       if (condition())
         return true
       if (System.currentTimeMillis() > startTime + waitTime)
-        return false
+        fail(msg)
       Thread.sleep(waitTime.min(100L))
     }
     // should never hit here
@@ -570,9 +570,10 @@ object TestUtils extends Logging {
   }
 
   def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition:
Int, timeout: Long = 5000L) = {
-    assertTrue("Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition,
timeout),
-      TestUtils.waitUntilTrue(() =>
-        servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic,
partition)), timeout))
+    TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic,
partition)),
+      "Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout),
+      waitTime = timeout)
   }
   
   def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
@@ -600,14 +601,22 @@ object TestUtils extends Logging {
 
   def ensureNoUnderReplicatedPartitions(zkClient: ZkClient, topic: String, partitionToBeReassigned:
Int, assignedReplicas: Seq[Int],
                                                 servers: Seq[KafkaServer]) {
-    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    assertFalse("Reassigned partition [%s,%d] is underreplicated".format(topic, partitionToBeReassigned),
-      inSyncReplicas.size < assignedReplicas.size)
-    val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned)
-    assertTrue("Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned),
leader.isDefined)
-    val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head
-    assertTrue("Reassigned partition [%s,%d] is underreplicated as reported by the leader
%d".format(topic, partitionToBeReassigned, leader.get),
-      leaderBroker.replicaManager.underReplicatedPartitionCount() == 0)
+    TestUtils.waitUntilTrue(() => {
+        val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+        inSyncReplicas.size == assignedReplicas.size
+      },
+      "Reassigned partition [%s,%d] is under replicated".format(topic, partitionToBeReassigned))
+    var leader: Option[Int] = None
+    TestUtils.waitUntilTrue(() => {
+        leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned)
+        leader.isDefined
+      },
+      "Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned))
+    TestUtils.waitUntilTrue(() => {
+        val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head
+        leaderBroker.replicaManager.underReplicatedPartitionCount() == 0
+      },
+      "Reassigned partition [%s,%d] is under-replicated as reported by the leader %d".format(topic,
partitionToBeReassigned, leader.get))
   }
 
   def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = {


Mime
View raw message