kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1420361 - in /kafka/trunk/core/src: main/scala/kafka/log/ main/scala/kafka/server/ test/scala/other/kafka/ test/scala/unit/kafka/log/ test/scala/unit/kafka/message/ test/scala/unit/kafka/utils/
Date Tue, 11 Dec 2012 19:47:16 GMT
Author: jkreps
Date: Tue Dec 11 19:47:13 2012
New Revision: 1420361

URL: http://svn.apache.org/viewvc?rev=1420361&view=rev
Log:
KAFKA-636 Make log segment delete an asynchronous background action done by the scheduler.
Patch reviewed by Jun and Neha.

Modified:
    kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
    kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
    kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
    kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
    kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala

Modified: kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1420361&r1=1420360&r2=1420361&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/Log.scala Tue Dec 11 19:47:13 2012
@@ -48,7 +48,8 @@ import com.yammer.metrics.core.Gauge
  * 
  */
 @threadsafe
-class Log(val dir: File, 
+class Log(val dir: File,
+          val scheduler: Scheduler,
           val maxSegmentSize: Int,
           val maxMessageSize: Int, 
           val flushInterval: Int = Int.MaxValue,
@@ -56,6 +57,7 @@ class Log(val dir: File, 
           val needsRecovery: Boolean, 
           val maxIndexSize: Int = (10*1024*1024),
           val indexIntervalBytes: Int = 4096,
+          val segmentDeleteDelayMs: Long = 60000,
           time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
@@ -90,22 +92,28 @@ class Log(val dir: File, 
     val logSegments = new ConcurrentSkipListMap[Long, LogSegment]
     val ls = dir.listFiles()
     if(ls != null) {
-      for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix))
{
-        if(!file.canRead)
-          throw new IOException("Could not read file " + file)
-        val filename = file.getName()
-        val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
-        val hasIndex = Log.indexFilename(dir, start).exists
-        val segment = new LogSegment(dir = dir, 
-                                     startOffset = start,
-                                     indexIntervalBytes = indexIntervalBytes, 
-                                     maxIndexSize = maxIndexSize)
-        if(!hasIndex) {
-          // this can only happen if someone manually deletes the index file
-          error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
-          segment.recover(maxMessageSize)
+      for(file <- ls if file.isFile) {
+        val filename = file.getName
+        if(filename.endsWith(DeletedFileSuffix)) {
+          val deleted = file.delete()
+          if(!deleted)
+            warn("Attempt to delete defunct segment file %s failed.".format(filename))
+        } else if(filename.endsWith(LogFileSuffix)) {
+          if(!file.canRead)
+            throw new IOException("Could not read file " + file)
+          val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
+          val hasIndex = Log.indexFilename(dir, start).exists
+          val segment = new LogSegment(dir = dir, 
+                                       startOffset = start,
+                                       indexIntervalBytes = indexIntervalBytes, 
+                                       maxIndexSize = maxIndexSize)
+          if(!hasIndex) {
+            // this can only happen if someone manually deletes the index file
+            error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
+            segment.recover(maxMessageSize)
+          }
+          logSegments.put(start, segment)
         }
-        logSegments.put(start, segment)
       }
     }
 
@@ -332,10 +340,8 @@ class Log(val dir: File, 
         if(segments.size == numToDelete)
           roll()
         // remove the segments for lookups
-        deletable.foreach(d => segments.remove(d.baseOffset))
+        deletable.foreach(deleteSegment(_))
       }
-      // do not lock around actual file deletion, it isn't O(1) on many filesystems
-      deletable.foreach(_.delete())
     }
     numToDelete
   }
@@ -425,7 +431,7 @@ class Log(val dir: File, 
   }
 
   /**
-   * Delete this log from the filesystem entirely
+   * Completely delete this log directory and all contents from the file system with no delay
    */
   def delete(): Unit = {
     logSegments.foreach(_.delete())
@@ -449,8 +455,7 @@ class Log(val dir: File, 
         truncateFullyAndStartAt(targetOffset)
       } else {
         val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
-        deletable.foreach(s => segments.remove(s.baseOffset))
-        deletable.foreach(_.delete())
+        deletable.foreach(deleteSegment(_))
         activeSegment.truncateTo(targetOffset)
         this.nextOffset.set(targetOffset)
       }
@@ -465,8 +470,7 @@ class Log(val dir: File, 
     debug("Truncate and start log '" + name + "' to " + newOffset)
     lock synchronized {
       val segmentsToDelete = logSegments.toList
-      segments.clear()
-      segmentsToDelete.foreach(_.delete())
+      segmentsToDelete.foreach(deleteSegment(_))
       segments.put(newOffset, 
                    new LogSegment(dir, 
                                   newOffset,
@@ -493,6 +497,41 @@ class Log(val dir: File, 
   
   override def toString() = "Log(" + this.dir + ")"
   
+  /**
+   * This method performs an asynchronous log segment delete by doing the following:
+   * <ol>
+   *   <li>It removes the segment from the segment map so that it will no longer be
used for reads.
+   *   <li>It renames the index and log files by appending .deleted to the respective
file name
+   *   <li>It schedules an asynchronous delete operation to occur in the future
+   * </ol>
+   * This allows reads to happen concurrently without synchronization and without the possibility
of physically
+   * deleting a file while it is being read from.
+   * 
+   * @param segment The log segment to schedule for deletion
+   */
+  private def deleteSegment(segment: LogSegment) {
+    info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset,
dir.getName))
+    lock synchronized {
+      segments.remove(segment.baseOffset)
+      val deletedLog = new File(segment.log.file.getPath + Log.DeletedFileSuffix)
+      val deletedIndex = new File(segment.index.file.getPath + Log.DeletedFileSuffix)
+      val renamedLog = segment.log.file.renameTo(deletedLog)
+      val renamedIndex = segment.index.file.renameTo(deletedIndex)
+      if(!renamedLog && segment.log.file.exists)
+        throw new KafkaStorageException("Failed to rename file %s to %s for log %s.".format(segment.log.file.getPath,
deletedLog.getPath, name))
+      if(!renamedIndex && segment.index.file.exists)
+        throw new KafkaStorageException("Failed to rename file %s to %s for log %s.".format(segment.index.file.getPath,
deletedIndex.getPath, name))
+      def asyncDeleteFiles() {
+        info("Deleting log segment %s for log %s.".format(segment.baseOffset, name))
+        if(!deletedLog.delete())
+          warn("Failed to delete log segment file %s for log %s.".format(deletedLog.getPath,
name))
+        if(!deletedIndex.delete())
+          warn("Failed to delete index segment file %s for log %s.".format(deletedLog.getPath,
name))
+      }
+      scheduler.schedule("delete-log-segment", asyncDeleteFiles, delay = segmentDeleteDelayMs)
+    }
+  }
+  
 }
 
 /**
@@ -501,6 +540,7 @@ class Log(val dir: File, 
 object Log {
   val LogFileSuffix = ".log"
   val IndexFileSuffix = ".index"
+  val DeletedFileSuffix = ".deleted"
 
   /**
    * Make log segment file name from offset bytes. All this does is pad out the offset number
with zeros

Modified: kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1420361&r1=1420360&r2=1420361&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Tue Dec 11 19:47:13 2012
@@ -36,9 +36,9 @@ import kafka.server.KafkaConfig
  * A background thread handles log retention by periodically truncating excess log segments.
  */
 @threadsafe
-private[kafka] class LogManager(val config: KafkaConfig,
-                                scheduler: Scheduler,
-                                private val time: Time) extends Logging {
+class LogManager(val config: KafkaConfig,
+                 scheduler: Scheduler,
+                 private val time: Time) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
   val LockFile = ".lock"
@@ -116,7 +116,8 @@ private[kafka] class LogManager(val conf
             val topicPartition = parseTopicPartitionName(dir.getName)
             val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
             val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
-            val log = new Log(dir, 
+            val log = new Log(dir,
+                              scheduler,
                               maxLogFileSize, 
                               config.maxMessageSize, 
                               logFlushInterval, 
@@ -124,6 +125,7 @@ private[kafka] class LogManager(val conf
                               needsRecovery, 
                               config.logIndexMaxSizeBytes,
                               config.logIndexIntervalBytes,
+                              config.logDeleteDelayMs,
                               time)
             val previous = this.logs.put(topicPartition, log)
             if(previous != null)
@@ -198,6 +200,7 @@ private[kafka] class LogManager(val conf
       val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
       val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize)
       log = new Log(dir, 
+                    scheduler,
                     maxLogFileSize, 
                     config.maxMessageSize, 
                     logFlushInterval, 
@@ -205,6 +208,7 @@ private[kafka] class LogManager(val conf
                     needsRecovery = false, 
                     config.logIndexMaxSizeBytes, 
                     config.logIndexIntervalBytes, 
+                    config.logDeleteDelayMs,
                     time)
       info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic,
topicAndPartition.partition, dataDir.getAbsolutePath))
       logs.put(topicAndPartition, log)

Modified: kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1420361&r1=1420360&r2=1420361&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Tue Dec 11 19:47:13 2012
@@ -115,6 +115,8 @@ class KafkaConfig private (val props: Ve
 
   /* the number of messages accumulated on a log partition before messages are flushed to
disk */
   val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
+  
+  val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue))
 
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed
to disk, e.g., topic1:3000,topic2: 6000  */
   val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)

Modified: kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1420361&r1=1420360&r2=1420361&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala Tue Dec 11 19:47:13
2012
@@ -18,7 +18,7 @@
 package kafka.log
 
 import kafka.message._
-import kafka.utils.{SystemTime, TestUtils, Utils}
+import kafka.utils.{SystemTime, TestUtils, Utils, KafkaScheduler}
 import kafka.server.KafkaConfig
 
 object TestLogPerformance {
@@ -33,7 +33,8 @@ object TestLogPerformance {
     val props = TestUtils.createBrokerConfig(0, -1)
     val config = new KafkaConfig(props)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = SystemTime)
+    val scheduler = new KafkaScheduler(1)
+    val log = new Log(dir, scheduler, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L,
needsRecovery = false, segmentDeleteDelayMs = 0, time = SystemTime)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)

Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1420361&r1=1420360&r2=1420361&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Tue Dec 11 19:47:13
2012
@@ -85,19 +85,19 @@ class LogManagerTest extends JUnit3Suite
   def testCleanupExpiredSegments() {
     val log = logManager.getOrCreateLog(name, 0)
     var offset = 0L
-    for(i <- 0 until 1000) {
+    for(i <- 0 until 200) {
       var set = TestUtils.singleMessageSet("test".getBytes())
       val info = log.append(set)
       offset = info.lastOffset
     }
-
     assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
-
-    // update the last modified time of all log segments
+    
     log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
-
+    
     time.sleep(maxLogAgeHours*60*60*1000 + 1)
-    assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
+    assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
+    time.sleep(log.segmentDeleteDelayMs + 1)
+    assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
 
     try {
@@ -131,18 +131,21 @@ class LogManagerTest extends JUnit3Suite
     var offset = 0L
 
     // add a bunch of messages that should be larger than the retentionSize
-    for(i <- 0 until 1000) {
+    val numMessages = 200
+    for(i <- 0 until numMessages) {
       val set = TestUtils.singleMessageSet("test".getBytes())
       val info = log.append(set)
       offset = info.firstOffset
     }
 
     // should be exactly 100 full segments + 1 new empty one
-    assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
+    assertEquals("Check we have the expected number of segments.", numMessages * setSize
/ config.logFileSize, log.numberOfSegments)
 
     // this cleanup shouldn't find any expired segments but should delete some to reduce
size
     time.sleep(logManager.InitialTaskDelayMs)
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
+    time.sleep(log.segmentDeleteDelayMs + 1)
+    assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
     try {
       log.read(0, 1024)

Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1420361&r1=1420360&r2=1420361&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Tue Dec 11 19:47:13 2012
@@ -65,7 +65,7 @@ class LogTest extends JUnitSuite {
     val time: MockTime = new MockTime()
 
     // create a log
-    val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery =
false, time = time)
+    val log = new Log(logDir, time.scheduler, 1000, config.maxMessageSize, 1000, rollMs,
needsRecovery = false, time = time)
     time.sleep(rollMs + 1)
 
     // segment age is less than its limit
@@ -98,7 +98,7 @@ class LogTest extends JUnitSuite {
     val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
 
     // create a log
-    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery
= false, time = time)
+    val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000,
needsRecovery = false, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -114,7 +114,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
+    val log = new Log(logDir, time.scheduler, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
     log.append(TestUtils.singleMessageSet("test".getBytes))
   }
 
@@ -123,7 +123,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAppendAndReadWithSequentialOffsets() {
-    val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
+    val log = new Log(logDir, time.scheduler, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
     val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
     
     for(i <- 0 until messages.length)
@@ -142,7 +142,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAppendAndReadWithNonSequentialOffsets() {
-    val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
+    val log = new Log(logDir, time.scheduler, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val messages = messageIds.map(id => new Message(id.toString.getBytes))
     
@@ -165,7 +165,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testReadAtLogGap() {
-    val log = new Log(logDir, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
+    val log = new Log(logDir, time.scheduler, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
     
     // keep appending until we have two segments with only a single message in the second
segment
     while(log.numberOfSegments == 1)
@@ -185,7 +185,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
+    val log = new Log(logDir, time.scheduler, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024,
1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -208,7 +208,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
+    val log = new Log(logDir, time.scheduler, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
     val offsets = messageSets.map(log.append(_).firstOffset)
@@ -232,7 +232,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testCompressedMessages() {
     /* this log should roll after every messageset */
-    val log = new Log(logDir, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
+    val log = new Log(logDir, time.scheduler, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
     
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3
*/
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes),
new Message("there".getBytes)))
@@ -255,7 +255,7 @@ class LogTest extends JUnitSuite {
     for(messagesToAppend <- List(0, 1, 25)) {
       logDir.mkdirs()
       // first test a log segment starting at 0
-      val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
+      val log = new Log(logDir, time.scheduler, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
       for(i <- 0 until messagesToAppend)
         log.append(TestUtils.singleMessageSet(i.toString.getBytes))
       
@@ -289,7 +289,7 @@ class LogTest extends JUnitSuite {
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
-    val log = new Log(logDir, 100, maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
+    val log = new Log(logDir, time.scheduler, 100, maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
 
     // should be able to append the small message
     log.append(first)
@@ -311,7 +311,7 @@ class LogTest extends JUnitSuite {
     val messageSize = 100
     val segmentSize = 7 * messageSize
     val indexInterval = 3 * messageSize
-    var log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+    var log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
     assertEquals("After appending %d messages to an empty log, the log end offset should
be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
@@ -320,14 +320,14 @@ class LogTest extends JUnitSuite {
     log.close()
     
     // test non-recovery case
-    log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+    log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
     assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages),
numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
     assertEquals("Should have same number of index entries as before.", numIndexEntries,
log.activeSegment.index.entries)
     log.close()
     
     // test recovery case
-    log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+    log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
     assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages),
numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
     assertEquals("Should have same number of index entries as before.", numIndexEntries,
log.activeSegment.index.entries)
@@ -341,7 +341,7 @@ class LogTest extends JUnitSuite {
   def testIndexRebuild() {
     // publish the messages and close the log
     val numMessages = 200
-    var log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+    var log = new Log(logDir, time.scheduler, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -351,7 +351,7 @@ class LogTest extends JUnitSuite {
     indexFiles.foreach(_.delete())
     
     // reopen the log
-    log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+    log = new Log(logDir, time.scheduler, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
     
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages,
log.logEndOffset)
     for(i <- 0 until numMessages)
@@ -370,7 +370,7 @@ class LogTest extends JUnitSuite {
     val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
 
     // create a log
-    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery
= false, time = time)
+    val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000,
needsRecovery = false, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
@@ -422,7 +422,7 @@ class LogTest extends JUnitSuite {
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
     val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
-    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery
= false, time = time)
+    val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000,
needsRecovery = false, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
     for (i<- 1 to msgPerSeg)
       log.append(set)
@@ -448,6 +448,7 @@ class LogTest extends JUnitSuite {
 
     // create a log
     var log = new Log(logDir, 
+                      time.scheduler,
                       maxSegmentSize = set.sizeInBytes * 5, 
                       maxMessageSize = config.maxMessageSize, 
                       maxIndexSize = 1000, 
@@ -459,6 +460,7 @@ class LogTest extends JUnitSuite {
       log.append(set)
     log.close()
     log = new Log(logDir, 
+                  time.scheduler,
                   maxSegmentSize = set.sizeInBytes * 5, 
                   maxMessageSize = config.maxMessageSize, 
                   maxIndexSize = 1000, 
@@ -469,4 +471,68 @@ class LogTest extends JUnitSuite {
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
   }
   
+  /**
+   * Test that deleted files are deleted after the appropriate time.
+   */
+  @Test
+  def testAsyncDelete() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val asyncDeleteMs = 1000
+    val log = new Log(logDir, 
+                      time.scheduler,
+                      maxSegmentSize = set.sizeInBytes * 5, 
+                      maxMessageSize = config.maxMessageSize, 
+                      maxIndexSize = 1000, 
+                      indexIntervalBytes = 10000, 
+                      segmentDeleteDelayMs = asyncDeleteMs,
+                      needsRecovery = true)
+    
+    // append some messages to create some segments
+    for(i <- 0 until 100)
+      log.append(set)
+    
+    // files should be renamed
+    val segments = log.logSegments.toArray
+    log.deleteOldSegments((s) => true)
+    
+    assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
+    val renamed = segments.map(segment => new File(segment.log.file.getPath + Log.DeletedFileSuffix))
+    assertTrue("Files should all be renamed to .deleted.", renamed.forall(_.exists))
+    
+    // when enough time passes the files should be deleted
+    time.sleep(asyncDeleteMs + 1)
+    assertTrue("Files should all be gone.", renamed.forall(!_.exists))
+  }
+  
+  /**
+   * Any files ending in .deleted should be removed when the log is re-opened.
+   */
+  @Test
+  def testOpenDeletesObsoleteFiles() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    var log = new Log(logDir, 
+                      time.scheduler,
+                      maxSegmentSize = set.sizeInBytes * 5, 
+                      maxMessageSize = config.maxMessageSize, 
+                      maxIndexSize = 1000, 
+                      indexIntervalBytes = 10000, 
+                      needsRecovery = false)
+    
+    // append some messages to create some segments
+    for(i <- 0 until 100)
+      log.append(set)
+    
+    log.deleteOldSegments((s) => true)
+    log.close()
+    
+    log = new Log(logDir, 
+                  time.scheduler,
+                  maxSegmentSize = set.sizeInBytes * 5, 
+                  maxMessageSize = config.maxMessageSize, 
+                  maxIndexSize = 1000, 
+                  indexIntervalBytes = 10000,
+                  needsRecovery = false)
+    assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
+  }
+  
 }

Modified: kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala?rev=1420361&r1=1420360&r2=1420361&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala Tue Dec 11 19:47:13
2012
@@ -17,8 +17,8 @@
 
 package kafka.message
 
-import java.util._
 import java.nio._
+import java.util.HashMap
 import scala.collection._
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
@@ -45,7 +45,7 @@ class MessageTest extends JUnitSuite {
   }
   
   @Test
-  def testFieldValues = {
+  def testFieldValues {
     for(v <- messages) {
       TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload)
       assertEquals(Message.CurrentMagicValue, v.message.magic)
@@ -69,7 +69,7 @@ class MessageTest extends JUnitSuite {
   }
   
   @Test
-  def testEquality() = {
+  def testEquality() {
     for(v <- messages) {
       assertFalse("Should not equal null", v.message.equals(null))
       assertFalse("Should not equal a random string", v.message.equals("asdf"))
@@ -80,7 +80,7 @@ class MessageTest extends JUnitSuite {
   }
   
   @Test
-  def testIsHashable() = {
+  def testIsHashable() {
     // this is silly, but why not
     val m = new HashMap[Message, Message]()
     for(v <- messages)

Modified: kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala?rev=1420361&r1=1420360&r2=1420361&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala Tue Dec 11 19:47:13
2012
@@ -16,7 +16,7 @@
  */
 package kafka.utils
 
-import scala.collection._
+import scala.collection.mutable.PriorityQueue
 import java.util.concurrent.TimeUnit
 
 /**
@@ -30,16 +30,13 @@ import java.util.concurrent.TimeUnit
  *   time.sleep(1001) // this should cause our scheduled task to fire
  * </code>
  *   
- * Two gotchas:
- * <ol>
- * <li> Incrementing the time by more than one task period will result in the correct
number of executions of each scheduled task
- * but the order of these executions is not specified.
- * <li> Incrementing the time to the exact next execution time of a task will result
in that task executing (it as if execution itself takes no time)
- * </ol>
+ * Incrementing the time to the exact next execution time of a task will result in that task
executing (it as if execution itself takes no time).
  */
+@nonthreadsafe
 class MockScheduler(val time: Time) extends Scheduler {
   
-  var tasks = mutable.ArrayBuffer[MockScheduled]()
+  /* a priority queue of tasks ordered by next execution time */
+  var tasks = new PriorityQueue[MockTask]()
 
   def startup() {}
   
@@ -50,34 +47,38 @@ class MockScheduler(val time: Time) exte
   /**
    * Check for any tasks that need to execute. Since this is a mock scheduler this check
only occurs
    * when this method is called and the execution happens synchronously in the calling thread.
-   * If you are using the scheduler associated with a MockTime instance this call will happen
automatically.
+   * If you are using the scheduler associated with a MockTime instance this call be triggered
automatically.
    */
   def tick() {
-    var tasks = mutable.ArrayBuffer[MockScheduled]()
     val now = time.milliseconds
-    for(task <- this.tasks) {
-      if(task.nextExecution <= now) {
-        if(task.period >= 0) {
-          val executions = (now - task.nextExecution) / task.period
-          for(i <- 0 to executions.toInt)
-            task.fun()
-          task.nextExecution += (executions + 1) * task.period
-          tasks += task
-        } else {
-          task.fun()
-        }
-      } else {
-        tasks += task
+    while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
+      /* pop and execute the task with the lowest next execution time */
+      val curr = tasks.head
+      this.tasks = tasks.tail
+      curr.fun()
+      /* if the task is periodic, reschedule it and re-enqueue */
+      if(curr.periodic) {
+        curr.nextExecution += curr.period
+        this.tasks += curr
       }
     }
-    this.tasks = tasks
   }
   
   def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit:
TimeUnit = TimeUnit.MILLISECONDS) {
-    tasks += MockScheduled(name, fun, time.milliseconds + delay, period = period)
+    tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
     tick()
   }
   
 }
 
-case class MockScheduled(val name: String, val fun: () => Unit, var nextExecution: Long,
val period: Long)
\ No newline at end of file
+case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Long, val
period: Long) extends Ordered[MockTask] {
+  def periodic = period >= 0
+  def compare(t: MockTask): Int = {
+    if(t.nextExecution == nextExecution)
+      return 0
+    else if (t.nextExecution < nextExecution)
+      return -1
+    else
+      return 1
+  }
+}
\ No newline at end of file

Modified: kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala?rev=1420361&r1=1420360&r2=1420361&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala Tue Dec 11 19:47:13
2012
@@ -67,6 +67,13 @@ class SchedulerTest {
   }
   
   @Test
+  def testReentrantTaskInMockScheduler() {
+    mockTime.scheduler.schedule("test1", () => mockTime.scheduler.schedule("test2", counter2.getAndIncrement,
delay=0), delay=1)
+    mockTime.sleep(1)
+    assertEquals(1, counter2.get)
+  }
+  
+  @Test
   def testNonPeriodicTask() {
     scheduler.schedule("test", counter1.getAndIncrement, delay = 0)
     retry(30000, () => assertEquals(counter1.get, 1))



Mime
View raw message