kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1377165 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/other/kafka/ test/scala/unit/kafka/log/
Date Sat, 25 Aug 2012 00:34:28 GMT
Author: junrao
Date: Sat Aug 25 00:34:27 2012
New Revision: 1377165

URL: http://svn.apache.org/viewvc?rev=1377165&view=rev
Log:
Time based log segment rollout (0.8 branch); patched by Swapnil Ghike; reviewed by Jun Rao,
Neha Narkhede; KAFKA-475

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1377165&r1=1377164&r2=1377165&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Sat Aug 25 00:34:27
2012
@@ -91,12 +91,26 @@ object Log {
 /**
  * A segment file in the log directory. Each log semgment consists of an open message set,
a start offset and a size 
  */
-class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends
Range {
+class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long, time: Time)
extends Range {
+  var firstAppendTime: Option[Long] = None
   @volatile var deleted = false
   /* Return the size in bytes of this log segment */
   def size: Long = messageSet.sizeInBytes()
   /* Return the absolute end offset of this log segment */
   def absoluteEndOffset: Long = start + messageSet.sizeInBytes()
+
+  def updateFirstAppendTime() {
+    if (firstAppendTime.isEmpty)
+      firstAppendTime = Some(time.milliseconds)
+  }
+
+  def append(messages: ByteBufferMessageSet) {
+    if (messages.sizeInBytes > 0) {
+      messageSet.append(messages)
+      updateFirstAppendTime()
+    }
+  }
+
   override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
 
   /**
@@ -114,7 +128,9 @@ class LogSegment(val file: File, val mes
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[kafka] class Log( val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery:
Boolean, time: Time, brokerId: Int = 0) extends Logging {
+private[kafka] class Log( val dir: File, val maxSize: Long,
+                          val flushInterval: Int, val rollIntervalMs: Long, val needRecovery:
Boolean,
+                          time: Time, brokerId: Int = 0) extends Logging {
   this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
 
   import kafka.log.Log._
@@ -150,7 +166,7 @@ private[kafka] class Log( val dir: File,
         val filename = file.getName()
         val start = filename.substring(0, filename.length - FileSuffix.length).toLong
         val messageSet = new FileMessageSet(file, false)
-        logSegments.add(new LogSegment(file, messageSet, start))
+        logSegments.add(new LogSegment(file, messageSet, start, time))
       }
     }
 
@@ -158,7 +174,7 @@ private[kafka] class Log( val dir: File,
       // no existing segments, create a new mutable segment
       val newFile = new File(dir, nameFromOffset(0))
       val set = new FileMessageSet(newFile, true)
-      logSegments.add(new LogSegment(newFile, set, 0))
+      logSegments.add(new LogSegment(newFile, set, 0, time))
     } else {
       // there is at least one existing segment, validate and recover them/it
       // sort segments into ascending order for fast searching
@@ -175,7 +191,7 @@ private[kafka] class Log( val dir: File,
       val last = logSegments.remove(logSegments.size - 1)
       last.messageSet.close()
       info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode,
recovery " + needRecovery)
-      val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)),
last.start)
+      val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)),
last.start, time)
       logSegments.add(mutable)
     }
     new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
@@ -242,10 +258,11 @@ private[kafka] class Log( val dir: File,
     // they are valid, insert them in the log
     lock synchronized {
       try {
-        val segment = segments.view.last
-        segment.messageSet.append(validMessages)
-        maybeFlush(numberOfMessages)
+        var segment = segments.view.last
         maybeRoll(segment)
+        segment = segments.view.last
+        segment.append(validMessages)
+        maybeFlush(numberOfMessages)
       }
       catch {
         case e: IOException =>
@@ -307,7 +324,8 @@ private[kafka] class Log( val dir: File,
    * Roll the log over if necessary
    */
   private def maybeRoll(segment: LogSegment) {
-    if(segment.messageSet.sizeInBytes > maxSize)
+    if ((segment.messageSet.sizeInBytes > maxSize) ||
+       ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get
> rollIntervalMs)))
       roll()
   }
 
@@ -324,7 +342,7 @@ private[kafka] class Log( val dir: File,
         newFile.delete()
       }
       debug("Rolling log '" + name + "' to " + newFile.getName())
-      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset,
time))
     }
   }
 
@@ -400,7 +418,7 @@ private[kafka] class Log( val dir: File,
       val deletedSegments = segments.trunc(segments.view.size)
       val newFile = new File(dir, Log.nameFromOffset(newOffset))
       debug("Truncate and start log '" + name + "' to " + newFile.getName())
-      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset,
time))
       deleteSegments(deletedSegments)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1377165&r1=1377164&r2=1377165&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Sat Aug 25
00:34:27 2012
@@ -32,18 +32,20 @@ import kafka.common.{KafkaException, Inv
 private[kafka] class LogManager(val config: KafkaConfig,
                                 scheduler: KafkaScheduler,
                                 private val time: Time,
+                                val logRollDefaultIntervalMs: Long,
                                 val logCleanupIntervalMs: Long,
                                 val logCleanupDefaultAgeMs: Long,
                                 needRecovery: Boolean) extends Logging {
 
   val logDir: File = new File(config.logDir)
   private val numPartitions = config.numPartitions
-  private val maxSize: Long = config.logFileSize
+  private val logFileSizeMap = config.logFileSizeMap
   private val flushInterval = config.flushInterval
   private val logCreationLock = new Object
   private val logFlushIntervals = config.flushIntervalMap
-  private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 *
60 * 1000L)) // convert hours to ms
-  private val logRetentionSize = config.logRetentionSize
+  private val logRetentionSizeMap = config.logRetentionSizeMap
+  private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60
* 60 * 1000L)) // convert hours to ms
+  private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
   this.logIdent = "[Log Manager on Broker " + config.brokerId + "], "
 
   /* Initialize a log for each subdirectory of the main log directory */
@@ -61,7 +63,10 @@ private[kafka] class LogManager(val conf
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")
-        val log = new Log(dir, maxSize, flushInterval, needRecovery, time, config.brokerId)
+        val topic = Utils.getTopicPartition(dir.getName)._1
+        val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
+        val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
+        val log = new Log(dir, maxLogFileSize, flushInterval, rollIntervalMs, needRecovery,
time, config.brokerId)
         val topicPartition = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]())
         val parts = logs.get(topicPartition._1)
@@ -101,7 +106,9 @@ private[kafka] class LogManager(val conf
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
-      new Log(d, maxSize, flushInterval, false, time, config.brokerId)
+      val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
+      val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
+      new Log(d, maxLogFileSize, flushInterval, rollIntervalMs, false, time, config.brokerId)
     }
   }
 
@@ -159,7 +166,7 @@ private[kafka] class LogManager(val conf
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
     val topic = Utils.getTopicPartition(log.name)._1
-    val logCleanupThresholdMs = logRetentionMs.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
+    val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
     val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMs)
     val total = log.deleteSegments(toBeDeleted)
     total
@@ -170,8 +177,10 @@ private[kafka] class LogManager(val conf
    *  is at least logRetentionSize bytes in size
    */
   private def cleanupSegmentsToMaintainSize(log: Log): Int = {
-    if(logRetentionSize < 0 || log.size < logRetentionSize) return 0
-    var diff = log.size - logRetentionSize
+    val topic = Utils.getTopicPartition(log.dir.getName)._1
+    val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
+    if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
+    var diff = log.size - maxLogRetentionSize
     def shouldDelete(segment: LogSegment) = {
       if(diff - segment.size >= 0) {
         diff -= segment.size

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1377165&r1=1377164&r2=1377165&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Sat Aug
25 00:34:27 2012
@@ -67,22 +67,34 @@ class KafkaConfig(props: Properties) ext
   
   /* the maximum size of a single log file */
   val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize,
Int.MaxValue))
-  
-  /* the number of messages accumulated on a log partition before messages are flushed to
disk */
-  val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
-  
+
+  /* the maximum size of a single log file for some specific topic */
+  val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size",
""))
+
+  /* the maximum time before a new log segment is rolled out */
+  val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))
+
+  /* the number of hours before rolling out a new log segment for some specific topic */
+  val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours",
""))
+
   /* the number of hours to keep a log file before deleting it */
-  val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
-  
+  val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue))
+
+  /* the number of hours to keep a log file before deleting it for some specific topic*/
+  val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours",
""))
+
   /* the maximum size of the log before deleting it */
   val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
 
-  /* the number of hours to keep a log file before deleting it for some specific topic*/
-  val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours",
""))
+  /* the maximum size of the log for some specific topic before deleting it */
+  val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size",
""))
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for
deletion */
   val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins",
10, (1, Int.MaxValue))
-  
+
+  /* the number of messages accumulated on a log partition before messages are flushed to
disk */
+  val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
+
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed
to disk, e.g., topic1:3000,topic2: 6000  */
   val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms",
""))
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1377165&r1=1377164&r2=1377165&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Sat Aug
25 00:34:27 2012
@@ -67,6 +67,7 @@ class KafkaServer(val config: KafkaConfi
     logManager = new LogManager(config,
                                 kafkaScheduler,
                                 time,
+                                1000L * 60 * 60 * config.logRollHours,
                                 1000L * 60 * config.logCleanupIntervalMinutes,
                                 1000L * 60 * 60 * config.logRetentionHours,
                                 needRecovery)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1377165&r1=1377164&r2=1377165&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Sat Aug 25 00:34:27
2012
@@ -685,12 +685,30 @@ object Utils extends Logging {
     csvString
   }
 
-  def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
+  def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = {
     val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties:
"
     val successMsg =  "The retention hour for "
     getCSVMap(retentionHours, exceptionMsg, successMsg)
   }
 
+  def getTopicRollHours(rollHours: String) : Map[String, Int] = {
+    val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: "
+    val successMsg =  "The roll hour for "
+    getCSVMap(rollHours, exceptionMsg, successMsg)
+  }
+
+  def getTopicFileSize(fileSizes: String): Map[String, Int] = {
+    val exceptionMsg = "Malformed token for topic.log.file.size in server.properties: "
+    val successMsg =  "The roll hour for "
+    getCSVMap(fileSizes, exceptionMsg, successMsg)
+  }
+
+  def getTopicRetentionSize(retentionSizes: String): Map[String, Long] = {
+    val exceptionMsg = "Malformed token for topic.log.retention.size in server.properties:
"
+    val successMsg =  "The roll hour for "
+    getCSVMap(retentionSizes, exceptionMsg, successMsg)
+  }
+
   def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = {
     val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties:
"
     val successMsg =  "The flush interval for "

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1377165&r1=1377164&r2=1377165&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
Sat Aug 25 00:34:27 2012
@@ -30,7 +30,7 @@ object TestLogPerformance {
     val batchSize = args(2).toInt
     val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, 5000000, false, SystemTime)
+    val log = new Log(dir, 50*1024*1024, 5000000, 24*7*60*60*1000L, false, SystemTime)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1377165&r1=1377164&r2=1377165&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Sat
Aug 25 00:34:27 2012
@@ -30,6 +30,7 @@ import kafka.utils._
 class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   val time: MockTime = new MockTime()
+  val maxRollInterval = 100
   val maxLogAge = 1000
   var logDir: File = null
   var logManager: LogManager = null
@@ -47,7 +48,7 @@ class LogManagerTest extends JUnit3Suite
                    override val flushInterval = 100
                  }
     scheduler.startup
-    logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge,
false)
+    logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval,
maxLogAge, false)
     logManager.startup
     logDir = logManager.logDir
 
@@ -120,11 +121,11 @@ class LogManagerTest extends JUnit3Suite
     logManager.shutdown()
     config = new KafkaConfig(props) {
       override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will
be 10 messages
-      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep
exactly 6 segments + 1 roll over
+      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
       override val logRetentionHours = retentionHours
       override val flushInterval = 100
     }
-    logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, retentionMs,
false)
+    logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval,
retentionMs, false)
     logManager.startup
 
     // create a log
@@ -141,11 +142,11 @@ class LogManagerTest extends JUnit3Suite
     log.flush
 
     // should be exactly 100 full segments + 1 new empty one
-    assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments)
+    assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
 
     // this cleanup shouldn't find any expired segments but should delete some to reduce
size
     logManager.cleanupLogs()
-    assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments)
+    assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -167,7 +168,7 @@ class LogManagerTest extends JUnit3Suite
                    override val flushInterval = Int.MaxValue
                    override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
                  }
-    logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge,
false)
+    logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval,
maxLogAge, false)
     logManager.startup
     val log = logManager.getOrCreateLog(name, 0)
     for(i <- 0 until 200) {
@@ -188,7 +189,7 @@ class LogManagerTest extends JUnit3Suite
                    override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
                    override val flushInterval = 100
                  }
-    logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge,
false)
+    logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval,
maxLogAge, false)
     logManager.startup
 
     for(i <- 0 until 1) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1377165&r1=1377164&r2=1377165&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Sat Aug
25 00:34:27 2012
@@ -24,7 +24,8 @@ import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.common.{KafkaException, OffsetOutOfRangeException}
-import kafka.utils.{MockTime, Utils, TestUtils, Range}
+import kafka.utils._
+import scala.Some
 
 class LogTest extends JUnitSuite {
   
@@ -45,18 +46,70 @@ class LogTest extends JUnitSuite {
     for(offset <- offsets)
       new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
   }
-  
+
+  /** Test that the size and time based log segment rollout works. */
+  @Test
+  def testTimeBasedLogRoll() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val rollMs = 1 * 60 * 60L
+    val time: MockTime = new MockTime()
+
+    // create a log
+    val log = new Log(logDir, 1000, 1000, rollMs, false, time)
+    time.currentMs += rollMs + 1
+
+    // segment age is less than its limit
+    log.append(set)
+    assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
+
+    log.append(set)
+    assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
+
+    // segment expires in age
+    time.currentMs += rollMs + 1
+    log.append(set)
+    assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
+
+    time.currentMs += rollMs + 1
+    val blank = Array[Message]()
+    log.append(new ByteBufferMessageSet(blank:_*))
+    assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+
+    time.currentMs += rollMs + 1
+    // the last segment expired in age, but was blank. So new segment should not be generated
+    log.append(set)
+    assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+  }
+
+  @Test
+  def testSizeBasedLogRoll() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val setSize = set.sizeInBytes
+    val msgPerSeg = 10
+    val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be
10 messages
+
+    // create a log
+    val log = new Log(logDir, logFileSize, 1000, 10000, false, time)
+    assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
+    // segments expire in size
+    for (i<- 1 to (msgPerSeg + 1)) {
+      log.append(set)
+    }
+    assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
+  }
+
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, 1000, false, time)
+    new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
   }
 
   @Test
   def testLoadInvalidLogsFails() {
     createEmptyLogs(logDir, 0, 15)
     try {
-      new Log(logDir, 1024, 1000, false, time)
+      new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
       fail("Allowed load of corrupt logs without complaint.")
     } catch {
       case e: KafkaException => "This is good"
@@ -65,7 +118,7 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testAppendAndRead() {
-    val log = new Log(logDir, 1024, 1000, false, time)
+    val log = new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -82,7 +135,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, 1000, false, time)
+    val log = new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024,
1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -102,7 +155,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, 1000, false, time)
+    val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
     val numMessages = 100
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -157,7 +210,7 @@ class LogTest extends JUnitSuite {
   def testEdgeLogRolls() {
     {
       // first test a log segment starting at 0
-      val log = new Log(logDir, 100, 1000, false, time)
+      val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
       val curOffset = log.logEndOffset
       assertEquals(curOffset, 0)
 
@@ -170,7 +223,7 @@ class LogTest extends JUnitSuite {
 
     {
       // second test an empty log segment starting at none-zero
-      val log = new Log(logDir, 100, 1000, false, time)
+      val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
       val numMessages = 1
       for(i <- 0 until numMessages)
         log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))



Mime
View raw message