kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1396102 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/ main/scala/kafka/tools/ test/scala/unit/kafka/log/ test/scala/unit/kafka/message/
Date Tue, 09 Oct 2012 16:40:57 GMT
Author: jkreps
Date: Tue Oct  9 16:40:56 2012
New Revision: 1396102

URL: http://svn.apache.org/viewvc?rev=1396102&view=rev
Log:
KAFKA-551 Remove the concept of immutable log segments--now all indexes and message sets are
mutable. This allows truncating old segments. Review by Jun and Neha.


Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala?rev=1396102&r1=1396101&r2=1396102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala Tue Oct
 9 16:40:56 2012
@@ -31,45 +31,35 @@ import kafka.metrics.{KafkaTimer, KafkaM
 /**
  * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
  * will fail on an immutable message set. An optional limit and start position can be applied
to the message set
- * which will control the position in the file at which the set begins
+ * which will control the position in the file at which the set begins.
  */
 @nonthreadsafe
 class FileMessageSet private[kafka](val file: File,
                                     private[log] val channel: FileChannel,
-                                    private[log] val start: Long, // the starting position
in the file
-                                    private[log] val limit: Long, // the length (may be less
than the file length)
-                                    val mutable: Boolean) extends MessageSet with Logging
{
+                                    private[log] val start: Long = 0L,
+                                    private[log] val limit: Long = Long.MaxValue) extends
MessageSet with Logging {
   
-  private val setSize = new AtomicLong()
-
-  if(mutable) {
-    if(limit < Long.MaxValue || start > 0)
-      throw new KafkaException("Attempt to open a mutable message set with a view or offset,
which is not allowed.")
-
-    setSize.set(channel.size())
-    channel.position(channel.size)
-  } else {
-    setSize.set(scala.math.min(channel.size(), limit) - start)
-  }
+  /* the size of the message set in bytes */
+  private val _size = new AtomicLong(scala.math.min(channel.size(), limit) - start)
+    
+  /* set the file position to the last byte in the file */
+  channel.position(channel.size)
   
   /**
    * Create a file message set with no limit or offset
    */
-  def this(file: File, channel: FileChannel, mutable: Boolean) = 
-    this(file, channel, 0, Long.MaxValue, mutable)
+  def this(file: File, channel: FileChannel) = this(file, channel, 0, Long.MaxValue)
   
   /**
    * Create a file message set with no limit or offset
    */
-  def this(file: File, mutable: Boolean) = 
-    this(file, Utils.openChannel(file, mutable), mutable)
+  def this(file: File) = this(file, Utils.openChannel(file, mutable = true))
   
   /**
    * Return a message set which is a view into this set starting from the given position
and with the given size limit.
    */
   def read(position: Long, size: Long): FileMessageSet = {
-    new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start +
position + size, sizeInBytes()),
-      false)
+    new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start +
position + size, sizeInBytes()))
   }
   
   /**
@@ -79,7 +69,7 @@ class FileMessageSet private[kafka](val 
   private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition =
{
     var position = startingPosition
     val buffer = ByteBuffer.allocate(12)
-    val size = setSize.get()
+    val size = _size.get()
     while(position + 12 < size) {
       buffer.rewind()
       channel.read(buffer, position)
@@ -138,29 +128,22 @@ class FileMessageSet private[kafka](val 
   /**
    * The number of bytes taken up by this file set
    */
-  def sizeInBytes(): Long = setSize.get()
-
-  def checkMutable(): Unit = {
-    if(!mutable)
-      throw new KafkaException("Attempt to invoke mutation on immutable message set.")
-  }
+  def sizeInBytes(): Long = _size.get()
   
   /**
    * Append this message to the message set
    */
   def append(messages: MessageSet): Unit = {
-    checkMutable()
     var written = 0L
     while(written < messages.sizeInBytes)
       written += messages.writeTo(channel, 0, messages.sizeInBytes)
-    setSize.getAndAdd(written)
+    _size.getAndAdd(written)
   }
  
   /**
    * Commit all written data to the physical disk
    */
   def flush() = {
-    checkMutable()
     LogFlushStats.logFlushTimer.time {
       channel.force(true)
     }
@@ -170,8 +153,7 @@ class FileMessageSet private[kafka](val 
    * Close this message set
    */
   def close() {
-    if(mutable)
-      flush()
+    flush()
     channel.close()
   }
   
@@ -188,13 +170,12 @@ class FileMessageSet private[kafka](val 
    * given size falls on a valid byte offset.
    */
   def truncateTo(targetSize: Long) = {
-    checkMutable()
     if(targetSize > sizeInBytes())
       throw new KafkaException("Attempt to truncate log segment to %d bytes failed since
the current ".format(targetSize) +
         " size of this log segment is only %d bytes".format(sizeInBytes()))
     channel.truncate(targetSize)
     channel.position(targetSize)
-    setSize.set(targetSize)
+    _size.set(targetSize)
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1396102&r1=1396101&r2=1396102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Tue Oct  9 16:40:56
2012
@@ -101,8 +101,8 @@ object Log {
 private[kafka] class Log(val dir: File, 
                          val maxLogFileSize: Long, 
                          val maxMessageSize: Int, 
-                         val flushInterval: Int,
-                         val rollIntervalMs: Long, 
+                         val flushInterval: Int = Int.MaxValue,
+                         val rollIntervalMs: Long = Long.MaxValue, 
                          val needsRecovery: Boolean, 
                          val maxIndexSize: Int = (10*1024*1024),
                          val indexIntervalBytes: Int = 4096,
@@ -151,8 +151,7 @@ private[kafka] class Log(val dir: File, 
         if(!Log.indexFilename(dir, start).exists)
           throw new IllegalStateException("Found log file with no corresponding index file.")
         logSegments.add(new LogSegment(dir = dir, 
-                                       startOffset = start, 
-                                       mutable = false, 
+                                       startOffset = start,
                                        indexIntervalBytes = indexIntervalBytes, 
                                        maxIndexSize = maxIndexSize))
       }
@@ -161,8 +160,7 @@ private[kafka] class Log(val dir: File, 
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment
       logSegments.add(new LogSegment(dir = dir, 
-                                     startOffset = 0, 
-                                     mutable = true, 
+                                     startOffset = 0,
                                      indexIntervalBytes = indexIntervalBytes, 
                                      maxIndexSize = maxIndexSize))
     } else {
@@ -176,17 +174,9 @@ private[kafka] class Log(val dir: File, 
         }
       })
 
-      //make the final section mutable and run recovery on it if necessary
-      val last = logSegments.remove(logSegments.size - 1)
-      last.close()
-      val mutableSegment = new LogSegment(dir = dir, 
-                                          startOffset = last.start, 
-                                          mutable = true, 
-                                          indexIntervalBytes = indexIntervalBytes, 
-                                          maxIndexSize = maxIndexSize)
+      // run recovery on the last segment if necessary
       if(needsRecovery)
-        recoverSegment(mutableSegment)
-      logSegments.add(mutableSegment)
+        recoverSegment(logSegments.get(logSegments.size - 1))
     }
     new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
   }
@@ -406,12 +396,11 @@ private[kafka] class Log(val dir: File, 
     }
     debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
     segments.view.lastOption match {
-      case Some(segment) => segment.index.makeReadOnly()
+      case Some(segment) => segment.index.trimToSize()
       case None => 
     }
     val segment = new LogSegment(dir, 
                                  startOffset = newOffset,
-                                 mutable = true, 
                                  indexIntervalBytes = indexIntervalBytes, 
                                  maxIndexSize = maxIndexSize)
     segments.append(segment)
@@ -546,8 +535,7 @@ private[kafka] class Log(val dir: File, 
       val deletedSegments = segments.trunc(segments.view.size)
       debug("Truncate and start log '" + name + "' to " + newOffset)
       segments.append(new LogSegment(dir, 
-                                     newOffset, 
-                                     mutable = true, 
+                                     newOffset,
                                      indexIntervalBytes = indexIntervalBytes, 
                                      maxIndexSize = maxIndexSize))
       deleteSegments(deletedSegments)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala?rev=1396102&r1=1396101&r2=1396102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala Tue Oct  9
16:40:56 2012
@@ -28,9 +28,9 @@ class LogSegment(val messageSet: FileMes
   
   @volatile var deleted = false
   
-  def this(dir: File, startOffset: Long, mutable: Boolean, indexIntervalBytes: Int, maxIndexSize:
Int) = 
-    this(new FileMessageSet(file = Log.logFilename(dir, startOffset), mutable = mutable),

-         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset,
mutable = mutable, maxIndexSize = maxIndexSize),
+  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) = 
+    this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), 
+         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset,
maxIndexSize = maxIndexSize),
          startOffset,
          indexIntervalBytes,
          SystemTime)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala?rev=1396102&r1=1396101&r2=1396102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala Tue Oct 
9 16:40:56 2012
@@ -49,39 +49,36 @@ import kafka.utils._
  * All external APIs translate from relative offsets to full offsets, so users of this class
do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, maxIndexSize:
Int = -1) extends Logging {
-
+class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1) extends Logging
{
+  
   /* the memory mapping */
   private var mmap: MappedByteBuffer = 
     {
       val newlyCreated = file.createNewFile()
       val raf = new RandomAccessFile(file, "rw")
       try {
-        if(mutable) {
-          /* if mutable create and memory map a new sparse file */
+        /* pre-allocate the file if necessary */
+        if(newlyCreated) {
           if(maxIndexSize < 8)
             throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
+          raf.setLength(roundToExactMultiple(maxIndexSize, 8))
+        }
           
-          /* pre-allocate the file if necessary */
-          if(newlyCreated)
-            raf.setLength(roundToExactMultiple(maxIndexSize, 8))
-          val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, raf.length())
+        val len = raf.length()  
+        if(len < 0 || len % 8 != 0)
+          throw new IllegalStateException("Index file " + file.getName + " is corrupt, found
" + len + 
+                                          " bytes which is not positive or not a multiple
of 8.")
           
-          /* set the position in the index for the next entry */
-          if(newlyCreated)
-            idx.position(0)
-          else
-            // if this is a pre-existing index, assume it is all valid and set position to
last entry
-            idx.position(roundToExactMultiple(idx.limit, 8))
-          idx
-        } else {
-          /* if not mutable, just mmap what they gave us */
-          val len = raf.length()
-          if(len < 0 || len % 8 != 0)
-            throw new IllegalStateException("Index file " + file.getName + " is corrupt,
found " + len + 
-                                            " bytes which is not positive or not a multiple
of 8.")
-          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len)
-        }
+        /* memory-map the file */
+        val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+          
+        /* set the position in the index for the next entry */
+        if(newlyCreated)
+          idx.position(0)
+        else
+          // if this is a pre-existing index, assume it is all valid and set position to
last entry
+          idx.position(roundToExactMultiple(idx.limit, 8))
+        idx
       } finally {
         Utils.swallow(raf.close())
       }
@@ -91,7 +88,7 @@ class OffsetIndex(val file: File, val ba
   val maxEntries = mmap.limit / 8
   
   /* the number of entries in the index */
-  private var size = if(mutable) new AtomicInteger(mmap.position / 8) else new AtomicInteger(mmap.limit
/ 8)
+  private var size = new AtomicInteger(mmap.position / 8)
   
   /* the last offset in the index */
   var lastOffset = readLastOffset()
@@ -115,8 +112,6 @@ class OffsetIndex(val file: File, val ba
    * the pair (baseOffset, 0) is returned.
    */
   def lookup(targetOffset: Long): OffsetPosition = {
-    if(entries == 0)
-      return OffsetPosition(baseOffset, 0)
     val idx = mmap.duplicate
     val slot = indexSlotFor(idx, targetOffset)
     if(slot == -1)
@@ -128,16 +123,20 @@ class OffsetIndex(val file: File, val ba
   /**
    * Find the slot in which the largest offset less than or equal to the given
    * target offset is stored.
-   * Return -1 if the least entry in the index is larger than the target offset 
+   * Return -1 if the least entry in the index is larger than the target offset or the index
is empty
    */
   private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
     // we only store the difference from the baseoffset so calculate that
     val relativeOffset = targetOffset - baseOffset
     
+    // check if the index is empty
+    if(entries == 0)
+      return -1
+    
     // check if the target offset is smaller than the least offset
     if(logical(idx, 0) > relativeOffset)
       return -1
-    
+      
     // binary search for the entry
     var lo = 0
     var hi = entries-1
@@ -175,8 +174,6 @@ class OffsetIndex(val file: File, val ba
    */
   def append(logicalOffset: Long, position: Int) {
     this synchronized {
-      if(!mutable)
-        throw new IllegalStateException("Attempt to append to an immutable offset index "
+ file.getName)
       if(isFull)
         throw new IllegalStateException("Attempt to append to a full index (size = " + size
+ ").")
       if(size.get > 0 && logicalOffset <= lastOffset)
@@ -227,17 +224,17 @@ class OffsetIndex(val file: File, val ba
   }
   
   /**
-   * Make this segment read-only, flush any unsaved changes, and truncate any excess bytes
+   * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes
from
+   * the file.
    */
-  def makeReadOnly() {
+  def trimToSize() {
     this synchronized {
-      mutable = false
       flush()
       val raf = new RandomAccessFile(file, "rws")
       try {
         val newLength = entries * 8
         raf.setLength(newLength)
-        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, newLength)
+        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, newLength)
       } finally {
         Utils.swallow(raf.close())
       }
@@ -265,8 +262,7 @@ class OffsetIndex(val file: File, val ba
   
   /** Close the index */
   def close() {
-    if(mutable)
-      makeReadOnly()
+    trimToSize()
   }
   
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala?rev=1396102&r1=1396101&r2=1396102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala Tue
Oct  9 16:40:56 2012
@@ -43,7 +43,7 @@ object DumpLogSegments {
   /* print out the contents of the index */
   def dumpIndex(file: File) {
     val startOffset = file.getName().split("\\.")(0).toLong
-    val index = new OffsetIndex(file = file, baseOffset = startOffset, mutable = false)
+    val index = new OffsetIndex(file = file, baseOffset = startOffset)
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
       // since it is a sparse file, in the event of a crash there may be many zero entries,
stop if we see one
@@ -57,7 +57,7 @@ object DumpLogSegments {
   def dumpLog(file: File, printContents: Boolean) {
     val startOffset = file.getName().split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
-    val messageSet = new FileMessageSet(file, false)
+    val messageSet = new FileMessageSet(file)
     var validBytes = 0L
     for(messageAndOffset <- messageSet) {
       val msg = messageAndOffset.message

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala?rev=1396102&r1=1396101&r2=1396102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
Tue Oct  9 16:40:56 2012
@@ -29,7 +29,7 @@ class FileMessageSetTest extends BaseMes
   val messageSet = createMessageSet(messages)
   
   def createMessageSet(messages: Seq[Message]): FileMessageSet = {
-    val set = new FileMessageSet(tempFile(), true)
+    val set = new FileMessageSet(tempFile())
     set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
     set.flush()
     set

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala?rev=1396102&r1=1396101&r2=1396102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala Tue
Oct  9 16:40:56 2012
@@ -15,10 +15,10 @@ class LogSegmentTest extends JUnit3Suite
   
   def createSegment(offset: Long): LogSegment = {
     val msFile = TestUtils.tempFile()
-    val ms = new FileMessageSet(msFile, true)
+    val ms = new FileMessageSet(msFile)
     val idxFile = TestUtils.tempFile()
     idxFile.delete()
-    val idx = new OffsetIndex(idxFile, offset, true, 100)
+    val idx = new OffsetIndex(idxFile, offset, 100)
     val seg = new LogSegment(ms, idx, offset, 10, SystemTime)
     segments += seg
     seg

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1396102&r1=1396101&r2=1396102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Tue Oct
 9 16:40:56 2012
@@ -342,6 +342,33 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change log size", log.size, 0)
   }
 
+  @Test
+  def testReopenThenTruncate() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+
+    // create a log
+    var log = new Log(logDir, 
+                      maxLogFileSize = set.sizeInBytes * 5, 
+                      maxMessageSize = config.maxMessageSize, 
+                      maxIndexSize = 1000, 
+                      indexIntervalBytes = 10000, 
+                      needsRecovery = true)
+    
+    // add enough messages to roll over several segments then close and re-open and attempt
to truncate
+    for(i <- 0 until 100)
+      log.append(set)
+    log.close()
+    log = new Log(logDir, 
+                  maxLogFileSize = set.sizeInBytes * 5, 
+                  maxMessageSize = config.maxMessageSize, 
+                  maxIndexSize = 1000, 
+                  indexIntervalBytes = 10000, 
+                  needsRecovery = true)
+    log.truncateTo(3)
+    assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
+    assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
+  }
+  
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
       case Some(range) => 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala?rev=1396102&r1=1396101&r2=1396102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
Tue Oct  9 16:40:56 2012
@@ -33,7 +33,7 @@ class OffsetIndexTest extends JUnitSuite
   
   @Before
   def setup() {
-    this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, mutable =
true, maxIndexSize = 30 * 8)
+    this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, maxIndexSize
= 30 * 8)
   }
   
   @After
@@ -41,7 +41,7 @@ class OffsetIndexTest extends JUnitSuite
     if(this.idx != null)
       this.idx.file.delete()
   }
-
+  
   @Test
   def randomLookupTest() {
     assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset,
0), idx.lookup(92L))
@@ -88,25 +88,6 @@ class OffsetIndexTest extends JUnitSuite
     }
     assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalStateException])
   }
-
-  
-  @Test
-  def testReadOnly() {
-    /* add some random values */
-    val vals = List((49, 1), (52, 2), (55, 3))
-    for((logical, physical) <- vals)
-      idx.append(logical, physical)
-    
-    idx.makeReadOnly()
-    
-    assertEquals("File length should just contain added entries.", vals.size * 8L, idx.file.length())
-    assertEquals("Last offset field should be initialized", vals.last._1, idx.lastOffset)
-    
-    for((logical, physical) <- vals)
-    	assertEquals("Should still be able to find everything.", OffsetPosition(logical, physical),
idx.lookup(logical))
-    	
-    assertWriteFails("Append should fail on read-only index", idx, 60, classOf[IllegalStateException])
-  }
   
   @Test(expected = classOf[IllegalArgumentException])
   def appendOutOfOrder() {
@@ -115,13 +96,13 @@ class OffsetIndexTest extends JUnitSuite
   }
   
   @Test
-  def reopenAsReadonly() {
+  def testReopen() {
     val first = OffsetPosition(51, 0)
     val sec = OffsetPosition(52, 1)
     idx.append(first.offset, first.position)
     idx.append(sec.offset, sec.position)
     idx.close()
-    val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset, mutable = false)
+    val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset)
     assertEquals(first, idxRo.lookup(first.offset))
     assertEquals(sec, idxRo.lookup(sec.offset))
     assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalStateException])
@@ -129,7 +110,8 @@ class OffsetIndexTest extends JUnitSuite
   
   @Test
   def truncate() {
-	val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, mutable = true,
maxIndexSize = 10 * 8)
+	val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, maxIndexSize =
10 * 8)
+	idx.truncate()
     for(i <- 1 until 10)
       idx.append(i, i)
       
@@ -155,13 +137,6 @@ class OffsetIndexTest extends JUnitSuite
       case e: Exception => assertEquals("Got an unexpected exception.", klass, e.getClass)
     }
   }
-  
-  def makeIndex(baseOffset: Long, mutable: Boolean, vals: Seq[(Long, Int)]): OffsetIndex
= {
-    val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = baseOffset, mutable
= mutable, maxIndexSize = 2 * vals.size * 8)
-    for ((logical, physical) <- vals)
-      idx.append(logical, physical)
-    idx
-  }
 
   def monotonicSeq(base: Int, len: Int): Seq[Int] = {
     val rand = new Random(1L)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1396102&r1=1396101&r2=1396102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
Tue Oct  9 16:40:56 2012
@@ -67,7 +67,7 @@ trait BaseMessageSetTestCases extends JU
       val channel = new RandomAccessFile(file, "rw").getChannel()
       val written = set.writeTo(channel, 0, 1024)
       assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
-      val newSet = new FileMessageSet(file, channel, false)
+      val newSet = new FileMessageSet(file, channel)
       checkEquals(set.iterator, newSet.iterator)
     }
   }



Mime
View raw message