kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1367821 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/ main/scala/kafka/server/ test/scala/other/kafka/ test/scala/unit/kafka/consumer/ test/scala/unit/kafka/controller/ test/scala/unit/kafka/integration/ test/scala/uni...
Date Tue, 31 Jul 2012 23:27:12 GMT
Author: nehanarkhede
Date: Tue Jul 31 23:27:11 2012
New Revision: 1367821

URL: http://svn.apache.org/viewvc?rev=1367821&view=rev
Log:
KAFKA-384 Fix intermittent unit test failures and remove Thread.sleep statements; patched
by Neha Narkhede; reviewed by Joel Koshy, Jun Rao and Jay Kreps

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Tue Jul 31 23:27:11
2012
@@ -115,7 +115,7 @@ class LogSegment(val file: File, val mes
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery:
Boolean)
+private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery:
Boolean, time: Time)
   extends Logging {
 
   import kafka.log.Log._
@@ -288,7 +288,7 @@ private[kafka] class Log(val dir: File, 
         else {
           // If the last segment to be deleted is empty and we roll the log, the new segment
will have the same
           // file name. So simply reuse the last segment and reset the modified time.
-          view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds)
+          view(numToDelete - 1).file.setLastModified(time.milliseconds)
           numToDelete -=1
         }
       }
@@ -348,10 +348,10 @@ private[kafka] class Log(val dir: File, 
 
     lock synchronized {
       debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current
time: " +
-          System.currentTimeMillis)
+          time.milliseconds)
       segments.view.last.messageSet.flush()
       unflushed.set(0)
-      lastflushedTime.set(System.currentTimeMillis)
+      lastflushedTime.set(time.milliseconds)
      }
   }
 
@@ -366,7 +366,7 @@ private[kafka] class Log(val dir: File, 
     for (i <- 0 until segsArray.length)
       offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)
     if (segsArray.last.size > 0)
-      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(),
SystemTime.milliseconds)
+      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(),
time.milliseconds)
 
     var startIndex = -1
     request.time match {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Tue Jul 31
23:27:11 2012
@@ -60,7 +60,7 @@ private[kafka] class LogManager(val conf
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")
-        val log = new Log(dir, maxSize, flushInterval, needRecovery)
+        val log = new Log(dir, maxSize, flushInterval, needRecovery, time)
         val topicPartion = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
         val parts = logs.get(topicPartion._1)
@@ -100,7 +100,7 @@ private[kafka] class LogManager(val conf
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
-      new Log(d, maxSize, flushInterval, false)
+      new Log(d, maxSize, flushInterval, false, time)
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Jul
31 23:27:11 2012
@@ -70,7 +70,7 @@ class KafkaServer(val config: KafkaConfi
     /* start log manager */
     logManager = new LogManager(config,
                                 kafkaScheduler,
-                                SystemTime,
+                                time,
                                 1000L * 60 * config.logCleanupIntervalMinutes,
                                 1000L * 60 * 60 * config.logRetentionHours,
                                 needRecovery)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Tue
Jul 31 23:27:11 2012
@@ -267,7 +267,7 @@ class ReplicaManager(val config: KafkaCo
   /**
    * Flushes the highwatermark value for all partitions to the highwatermark file
    */
-  private def checkpointHighwaterMarks() {
+  def checkpointHighwaterMarks() {
     val highwaterMarksForAllPartitions = allReplicas.map { partition =>
       val topic = partition._1._1
       val partitionId = partition._1._2

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
Tue Jul 31 23:27:11 2012
@@ -18,7 +18,7 @@
 package kafka.log
 
 import kafka.message._
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{SystemTime, TestUtils, Utils}
 
 object TestLogPerformance {
 
@@ -30,7 +30,7 @@ object TestLogPerformance {
     val batchSize = args(2).toInt
     val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, 5000000, false)
+    val log = new Log(dir, 50*1024*1024, 5000000, false, SystemTime)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
Tue Jul 31 23:27:11 2012
@@ -148,7 +148,6 @@ class ZookeeperConsumerConnectorTest ext
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String,
Int]())
     // send some messages to each broker
-    Thread.sleep(200)
     val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum
< t.checksum)
@@ -234,7 +233,6 @@ class ZookeeperConsumerConnectorTest ext
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String,
Int]())
     // send some messages to each broker
-    Thread.sleep(200)
     val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages,
GZIPCompressionCodec)
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages,
GZIPCompressionCodec)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum
< t.checksum)
@@ -265,7 +263,6 @@ class ZookeeperConsumerConnectorTest ext
 
     // shutdown one server
     servers.last.shutdown
-    Thread.sleep(500)
 
     // send some messages to each broker
     val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
Tue Jul 31 23:27:11 2012
@@ -47,20 +47,20 @@ class ControllerBasicTest extends JUnit3
     brokers(0).shutdown()
     brokers(1).shutdown()
     brokers(3).shutdown()
-    Thread.sleep(1000)
-
+    assertTrue("Controller not elected", TestUtils.waitUntilTrue(() =>
+      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) != null, zookeeper.tickTime))
     var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
-    assertEquals(curController, "2")
+    assertEquals("Controller should move to broker 2", "2", curController)
 
     brokers(1).startup()
     brokers(2).shutdown()
-    Thread.sleep(1000)
+    assertTrue("Controller not elected", TestUtils.waitUntilTrue(() =>
+      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) != null, zookeeper.tickTime))
     curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
-    assertEquals(curController, "1")
+    assertEquals("Controller should move to broker 1", "1", curController)
   }
 
   def testControllerCommandSend(){
-    Thread.sleep(1000)
     for(broker <- brokers){
       if(broker.kafkaController.isActive){
         val leaderAndISRRequest = ControllerTestUtils.createSampleLeaderAndISRRequest()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
Tue Jul 31 23:27:11 2012
@@ -93,7 +93,6 @@ class LazyInitProducerTest extends JUnit
       }
 
       // wait a bit for produced message to be available
-      Thread.sleep(200)
       val request = builder.build()
       val response = consumer.fetch(request)
       for( (topic, offset) <- topicOffsets) {
@@ -138,7 +137,6 @@ class LazyInitProducerTest extends JUnit
     producer.send(produceList: _*)
 
     // wait a bit for produced message to be available
-    Thread.sleep(200)
     val request = builder.build()
     val response = consumer.fetch(request)
     for(topic <- topics) {
@@ -166,7 +164,6 @@ class LazyInitProducerTest extends JUnit
 
     producer.send(produceList: _*)
     // wait a bit for produced message to be available
-    Thread.sleep(750)
     val request = builder.build()
     val response = consumer.fetch(request)
     for(topic <- topics) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
Tue Jul 31 23:27:11 2012
@@ -49,7 +49,6 @@ class LogCorruptionTest extends JUnit3Su
     val producerData = new ProducerData[String, Message](topic, topic, List(new Message("hello".getBytes())))
 
     producer.send(producerData)
-    Thread.sleep(200)
 
     // corrupt the file on disk
     val logFile = new File(config.logDir + File.separator + topic + "-" + partition, Log.nameFromOffset(0))
@@ -61,7 +60,6 @@ class LogCorruptionTest extends JUnit3Su
     channel.force(true)
     channel.close
 
-    Thread.sleep(500)
     // test SimpleConsumer
     val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0,
10000).build())
     try {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
Tue Jul 31 23:27:11 2012
@@ -31,8 +31,8 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.admin.CreateTopicCommand
 import kafka.common.{ErrorMapping, InvalidPartitionException, FetchRequestFormatException,
OffsetOutOfRangeException}
+import kafka.admin.{AdminUtils, CreateTopicCommand}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -109,7 +109,6 @@ class PrimitiveApiTest extends JUnit3Sui
 
     val stringProducer1 = new Producer[String, String](config)
     stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
-    Thread.sleep(200)
 
     val request = new FetchRequestBuilder()
       .correlationId(100)
@@ -138,7 +137,6 @@ class PrimitiveApiTest extends JUnit3Sui
 
     val stringProducer1 = new Producer[String, String](config)
     stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
-    Thread.sleep(200)
 
     var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
     val messageSet = fetched.messageSet(topic, 0)
@@ -167,7 +165,6 @@ class PrimitiveApiTest extends JUnit3Sui
     }
 
       // wait a bit for produced message to be available
-      Thread.sleep(700)
       val request = builder.build()
       val response = consumer.fetch(request)
       for( (topic, partition) <- topics) {
@@ -235,7 +232,6 @@ class PrimitiveApiTest extends JUnit3Sui
       }
 
       // wait a bit for produced message to be available
-      Thread.sleep(200)
       val request = builder.build()
       val response = consumer.fetch(request)
       for( (topic, partition) <- topics) {
@@ -303,7 +299,6 @@ class PrimitiveApiTest extends JUnit3Sui
     producer.send(produceList: _*)
 
     // wait a bit for produced message to be available
-    Thread.sleep(200)
     val request = builder.build()
     val response = consumer.fetch(request)
     for( (topic, partition) <- topics) {
@@ -328,7 +323,6 @@ class PrimitiveApiTest extends JUnit3Sui
     producer.send(produceList: _*)
 
     // wait a bit for produced message to be available
-    Thread.sleep(200)
     val request = builder.build()
     val response = consumer.fetch(request)
     for( (topic, partition) <- topics) {
@@ -337,10 +331,12 @@ class PrimitiveApiTest extends JUnit3Sui
     }
   }
 
-  def testConsumerNotExistTopic() {
+  def testConsumerEmptyTopic() {
     val newTopic = "new-topic"
     CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
-    Thread.sleep(200)
+    assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
+      AdminUtils.getTopicMetaDataFromZK(List(newTopic),
+        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0,
10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
Tue Jul 31 23:27:11 2012
@@ -41,6 +41,7 @@ trait ProducerConsumerTestHarness extend
       props.put("reconnect.interval", "10000")
       props.put("producer.retry.backoff.ms", "1000")
       props.put("producer.num.retries", "3")
+      props.put("producer.request.required.acks", "-1")
       producer = new Producer(new ProducerConfig(props))
       consumer = new SimpleConsumer(host,
                                    port,

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Tue
Jul 31 23:27:11 2012
@@ -118,7 +118,6 @@ class LogManagerTest extends JUnit3Suite
     val retentionMs = 1000 * 60 * 60 * retentionHours
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
-    Thread.sleep(100)
     config = new KafkaConfig(props) {
       override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will
be 10 messages
       override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep
exactly 6 segments + 1 roll over
@@ -138,9 +137,8 @@ class LogManagerTest extends JUnit3Suite
       log.append(set)
       offset += set.sizeInBytes
     }
-    // flush to make sure it's written to disk, then sleep to confirm
+    // flush to make sure it's written to disk
     log.flush
-    Thread.sleep(2000)
 
     // should be exactly 100 full segments + 1 new empty one
     assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments)
@@ -163,7 +161,6 @@ class LogManagerTest extends JUnit3Suite
   def testTimeBasedFlush() {
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
-    Thread.sleep(100)
     config = new KafkaConfig(props) {
                    override val logFileSize = 1024 *1024 *1024
                    override val flushSchedulerThreadRate = 50
@@ -186,7 +183,6 @@ class LogManagerTest extends JUnit3Suite
   def testConfigurablePartitions() {
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
-    Thread.sleep(100)
     config = new KafkaConfig(props) {
                    override val logFileSize = 256
                    override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Tue
Jul 31 23:27:11 2012
@@ -43,6 +43,7 @@ class LogOffsetTest extends JUnit3Suite 
   var logSize: Int = 100
   val brokerPort: Int = 9099
   var simpleConsumer: SimpleConsumer = null
+  var time: Time = new MockTime()
 
   @Before
   override def setUp() {
@@ -50,8 +51,8 @@ class LogOffsetTest extends JUnit3Suite 
     val config: Properties = createBrokerConfig(1, brokerPort)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
-
-    server = TestUtils.createServer(new KafkaConfig(config))
+    time = new MockTime()
+    server = TestUtils.createServer(new KafkaConfig(config), time)
     simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
   }
 
@@ -90,7 +91,6 @@ class LogOffsetTest extends JUnit3Suite 
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
-    Thread.sleep(100)
 
     val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.LatestTime, 10)
 
@@ -148,15 +148,16 @@ class LogOffsetTest extends JUnit3Suite 
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
-    val now = System.currentTimeMillis
-    Thread.sleep(100)
+    time.sleep(20)
+    val now = time.milliseconds
 
     val offsetRequest = new OffsetRequest(topic, part, now, 10)
     val offsets = log.getOffsetsBefore(offsetRequest)
-    assertTrue((Array(216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+    println("Offsets = " + offsets.mkString(","))
+    assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
 
     val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10)
-    assertTrue((Array(216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
+    assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
   }
 
   @Test
@@ -175,8 +176,6 @@ class LogOffsetTest extends JUnit3Suite 
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
-    Thread.sleep(100)
-
     val offsetRequest = new OffsetRequest(topic, part,
                                           OffsetRequest.EarliestTime, 10)
     val offsets = log.getOffsetsBefore(offsetRequest)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Tue Jul
31 23:27:11 2012
@@ -22,13 +22,14 @@ import java.util.ArrayList
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
-import kafka.utils.{Utils, TestUtils, Range}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.common.{KafkaException, OffsetOutOfRangeException}
+import kafka.utils.{MockTime, Utils, TestUtils, Range}
 
 class LogTest extends JUnitSuite {
   
   var logDir: File = null
+  val time = new MockTime
 
   @Before
   def setUp() {
@@ -48,14 +49,14 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, 1000, false)
+    new Log(logDir, 1024, 1000, false, time)
   }
 
   @Test
   def testLoadInvalidLogsFails() {
     createEmptyLogs(logDir, 0, 15)
     try {
-      new Log(logDir, 1024, 1000, false)
+      new Log(logDir, 1024, 1000, false, time)
       fail("Allowed load of corrupt logs without complaint.")
     } catch {
       case e: KafkaException => "This is good"
@@ -64,7 +65,7 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testAppendAndRead() {
-    val log = new Log(logDir, 1024, 1000, false)
+    val log = new Log(logDir, 1024, 1000, false, time)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -81,7 +82,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, 1000, false)
+    val log = new Log(logDir, 1024, 1000, false, time)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024,
1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -101,7 +102,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, 1000, false)
+    val log = new Log(logDir, 100, 1000, false, time)
     val numMessages = 100
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -156,7 +157,7 @@ class LogTest extends JUnitSuite {
   def testEdgeLogRolls() {
     {
       // first test a log segment starting at 0
-      val log = new Log(logDir, 100, 1000, false)
+      val log = new Log(logDir, 100, 1000, false, time)
       val curOffset = log.logEndOffset
       assertEquals(curOffset, 0)
 
@@ -169,7 +170,7 @@ class LogTest extends JUnitSuite {
 
     {
       // second test an empty log segment starting at none-zero
-      val log = new Log(logDir, 100, 1000, false)
+      val log = new Log(logDir, 100, 1000, false, time)
       val numMessages = 1
       for(i <- 0 until numMessages)
         log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
Tue Jul 31 23:27:11 2012
@@ -55,20 +55,14 @@ class KafkaLog4jAppenderTest extends JUn
     val logDirZkPath = propsZk.getProperty("log.dir")
     logDirZk = new File(logDirZkPath)
     serverZk = TestUtils.createServer(new KafkaConfig(propsZk));
-
-    Thread.sleep(100)
-
     simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024)
   }
 
   @After
   override def tearDown() {
     simpleConsumerZk.close
-
     serverZk.shutdown
     Utils.rm(logDirZk)
-
-    Thread.sleep(500)
     super.tearDown()
   }
 
@@ -149,8 +143,6 @@ class KafkaLog4jAppenderTest extends JUn
     for(i <- 1 to 5)
       info("test")
 
-    Thread.sleep(500)
-
     val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic",
0, 0L, 1024*1024).build())
     val fetchMessage = response.messageSet("test-topic", 0)
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
Tue Jul 31 23:27:11 2012
@@ -29,12 +29,12 @@ import kafka.message.{NoCompressionCodec
 import kafka.producer.async._
 import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
 import kafka.server.KafkaConfig
-import kafka.utils.{FixedValuePartitioner, NegativePartitioner, TestZKUtils, TestUtils}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection.Map
 import scala.collection.mutable.ListBuffer
+import kafka.utils._
 
 class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -106,13 +106,6 @@ class AsyncProducerTest extends JUnit3Su
     }
   }
 
-  def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = {
-    val producerDataList = new ListBuffer[ProducerData[String,String]]
-    for (i <- 0 until nEvents)
-      producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg"
+ i)))
-    producerDataList
-  }
-
   @Test
   def testBatchSize() {
     /**
@@ -150,17 +143,18 @@ class AsyncProducerTest extends JUnit3Su
     EasyMock.expectLastCall
     EasyMock.replay(mockHandler)
 
+    val queueExpirationTime = 200
     val queue = new LinkedBlockingQueue[ProducerData[String,String]](10)
     val producerSendThread =
-      new ProducerSendThread[String,String]("thread1", queue, mockHandler, 200, 5)
+      new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime,
5)
     producerSendThread.start()
 
     for (producerData <- producerDataList)
       queue.put(producerData)
 
-    Thread.sleep(300)
-    producerSendThread.shutdown
+    Thread.sleep(queueExpirationTime + 10)
     EasyMock.verify(mockHandler)
+    producerSendThread.shutdown
   }
 
   @Test
@@ -446,7 +440,7 @@ class AsyncProducerTest extends JUnit3Su
     // entirely.  The second request will succeed for partition 1 but fail for partition
0.
     // On the third try for partition 0, let it succeed.
     val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs),
0)
-    val response1 = 
+    val response1 =
       new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort,
0.toShort), Array(0L, 0L))
     val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
     val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort),
Array(0L))
@@ -530,6 +524,13 @@ class AsyncProducerTest extends JUnit3Su
     }
   }
 
+  def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = {
+    val producerDataList = new ListBuffer[ProducerData[String,String]]
+    for (i <- 0 until nEvents)
+      producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg"
+ i)))
+    producerDataList
+  }
+
   private def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
     val encoder = new StringEncoder
     new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)):
_*)
@@ -565,11 +566,4 @@ class AsyncProducerTest extends JUnit3Su
     val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort)
     new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
   }
-
-  class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config)
{
-    override def send(produceRequest: ProducerRequest): ProducerResponse = {
-      Thread.sleep(1000)
-      null
-    }
-  }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
Tue Jul 31 23:27:11 2012
@@ -68,8 +68,6 @@ class ProducerTest extends JUnit3Suite w
 
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
-
-    Thread.sleep(500)
   }
 
   override def tearDown() {
@@ -81,7 +79,6 @@ class ProducerTest extends JUnit3Suite w
     server2.awaitShutdown()
     Utils.rm(server1.config.logDir)
     Utils.rm(server2.config.logDir)
-    Thread.sleep(500)
     super.tearDown()
   }
 
@@ -151,6 +148,7 @@ class ProducerTest extends JUnit3Suite w
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("producer.request.timeout.ms", "2000")
+//    props.put("producer.request.required.acks", "-1")
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     // create topic
@@ -166,7 +164,6 @@ class ProducerTest extends JUnit3Suite w
       // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
       // on broker 0
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      Thread.sleep(100)
     } catch {
       case e => fail("Unexpected exception: " + e)
     }
@@ -174,12 +171,10 @@ class ProducerTest extends JUnit3Suite w
     // kill the broker
     server1.shutdown
     server1.awaitShutdown()
-    Thread.sleep(100)
 
     try {
       // These sends should fail since there are no available brokers
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      Thread.sleep(100)
       fail("Should fail since no leader exists for the partition.")
     } catch {
       case e => // success
@@ -187,8 +182,6 @@ class ProducerTest extends JUnit3Suite w
 
     // restart server 1
     server1.startup()
-    Thread.sleep(100)
-
     try {
       // cross check if broker 1 got the messages
       val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0,
0, 10000).build())
@@ -222,7 +215,6 @@ class ProducerTest extends JUnit3Suite w
     try {
       // this message should be assigned to partition 0 whose leader is on broker 0
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
-      Thread.sleep(100)
       // cross check if brokers got the messages
       val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0,
0, 10000).build())
       val messageSet1 = response1.messageSet("new-topic", 0).iterator

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
Tue Jul 31 23:27:11 2012
@@ -41,8 +41,7 @@ class HighwatermarkPersistenceTest exten
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
     replicaManager.startup()
-    // sleep until flush ms
-    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    replicaManager.checkpointHighwaterMarks()
     var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
     assertEquals(0L, fooPartition0Hw)
     val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
@@ -51,8 +50,7 @@ class HighwatermarkPersistenceTest exten
     // create leader and follower replicas
     val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
     val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId,
partition0)
-    // sleep until flush ms
-    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    replicaManager.checkpointHighwaterMarks()
     fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
     assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
     try {
@@ -65,8 +63,7 @@ class HighwatermarkPersistenceTest exten
     partition0.leaderId(Some(leaderReplicaPartition0.brokerId))
     // set the highwatermark for local replica
     partition0.leaderHW(Some(5L))
-    // sleep until flush interval
-    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    replicaManager.checkpointHighwaterMarks()
     fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
     assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
     EasyMock.verify(zkClient)
@@ -85,8 +82,7 @@ class HighwatermarkPersistenceTest exten
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
     replicaManager.startup()
-    // sleep until flush ms
-    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    replicaManager.checkpointHighwaterMarks()
     var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
     assertEquals(0L, topic1Partition0Hw)
     val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet)
@@ -94,16 +90,14 @@ class HighwatermarkPersistenceTest exten
     val topic1Log0 = getMockLog
     // create leader and follower replicas
     val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0,
configs.map(_.brokerId).toSet)
-    // sleep until flush ms
-    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    replicaManager.checkpointHighwaterMarks()
     topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
     assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw)
     // set the leader
     topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId))
     // set the highwatermark for local replica
     topic1Partition0.leaderHW(Some(5L))
-    // sleep until flush interval
-    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    replicaManager.checkpointHighwaterMarks()
     topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
     assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark())
     assertEquals(5L, topic1Partition0Hw)
@@ -113,8 +107,7 @@ class HighwatermarkPersistenceTest exten
     val topic2Log0 = getMockLog
     // create leader and follower replicas
     val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0,
configs.map(_.brokerId).toSet)
-    // sleep until flush ms
-    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    replicaManager.checkpointHighwaterMarks()
     var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
     assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw)
     // set the leader
@@ -125,8 +118,7 @@ class HighwatermarkPersistenceTest exten
     // change the highwatermark for topic1
     topic1Partition0.leaderHW(Some(10L))
     assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark())
-    // sleep until flush interval
-    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    replicaManager.checkpointHighwaterMarks()
     // verify checkpointed hw for topic 2
     topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
     assertEquals(15L, topic2Partition0Hw)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
Tue Jul 31 23:27:11 2012
@@ -123,8 +123,8 @@ class LogRecoveryTest extends JUnit3Suit
 
     sendMessages()
     // give some time for follower 1 to record leader HW of 60
-    TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() ==
60L, 500)
-
+    assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
+      server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000))
     // shutdown the servers to allow the hw to be checkpointed
     servers.map(server => server.shutdown())
     producer.close()
@@ -165,7 +165,8 @@ class LogRecoveryTest extends JUnit3Suit
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1)
== 1))
     sendMessages(20)
     // give some time for follower 1 to record leader HW of 600
-    TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() ==
600L, 500)
+    assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
+      server2.getReplica(topic, 0).get.highWatermark() == 600L, 1000))
     // shutdown the servers to allow the hw to be checkpointed
     servers.map(server => server.shutdown())
     producer.close()
@@ -209,7 +210,8 @@ class LogRecoveryTest extends JUnit3Suit
 
     sendMessages(2)
     // allow some time for the follower to get the leader HW
-    TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() ==
60L, 1000)
+    assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
+      server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000))
     // kill the server hosting the preferred replica
     server1.shutdown()
     server2.shutdown()
@@ -231,7 +233,8 @@ class LogRecoveryTest extends JUnit3Suit
 
     sendMessages(2)
     // allow some time for the follower to get the leader HW
-    TestUtils.waitUntilTrue(() => server1.getReplica(topic, 0).get.highWatermark() ==
120L, 1000)
+    assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
+      server1.getReplica(topic, 0).get.highWatermark() == 120L, 1000))
     // shutdown the servers to allow the hw to be checkpointed
     servers.map(server => server.shutdown())
     producer.close()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
Tue Jul 31 23:27:11 2012
@@ -54,7 +54,6 @@ class ServerShutdownTest extends JUnit3S
       // send some messages
       producer.send(new ProducerData[Int, Message](topic, 0, sent1))
 
-      Thread.sleep(200)
       // do a clean shutdown
       server.shutdown()
       val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile)
@@ -86,8 +85,6 @@ class ServerShutdownTest extends JUnit3S
       // send some more messages
       producer.send(new ProducerData[Int, Message](topic, 0, sent2))
 
-      Thread.sleep(200)
-
       fetchedMessage = null
       while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
         val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset,
10000).build())

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue
Jul 31 23:27:11 2012
@@ -100,8 +100,8 @@ object TestUtils extends Logging {
    * USING THIS IS A SIGN YOU ARE NOT WRITING A REAL UNIT TEST
    * @param config The configuration of the server
    */
-  def createServer(config: KafkaConfig): KafkaServer = {
-    val server = new KafkaServer(config)
+  def createServer(config: KafkaConfig, time: Time = SystemTime): KafkaServer = {
+    val server = new KafkaServer(config, time)
     server.startup()
     server
   }
@@ -422,7 +422,6 @@ object TestUtils extends Logging {
         return true
       if (System.currentTimeMillis() > startTime + waitTime)
         return false
-      Thread.sleep(100)
     }
     // should never hit here
     throw new RuntimeException("unexpected error")

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala?rev=1367821&r1=1367820&r2=1367821&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala Tue
Jul 31 23:27:11 2012
@@ -39,17 +39,11 @@ class ZKEphemeralTest extends JUnit3Suit
     }
 
     var testData: String = null
-
     testData = ZkUtils.readData(zkClient, "/tmp/zktest")
     Assert.assertNotNull(testData)
-
     zkClient.close
-
-    Thread.sleep(zkSessionTimeoutMs)
-
     zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
                                 ZKStringSerializer)
-
     val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
     Assert.assertFalse(nodeExists)
   }



Mime
View raw message