kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1416253 [1/2] - in /kafka/trunk: core/src/main/scala/kafka/log/ core/src/main/scala/kafka/message/ core/src/main/scala/kafka/producer/ core/src/main/scala/kafka/server/ core/src/main/scala/kafka/tools/ core/src/main/scala/kafka/utils/ core...
Date Sun, 02 Dec 2012 20:50:03 GMT
Author: jkreps
Date: Sun Dec  2 20:50:01 2012
New Revision: 1416253

URL: http://svn.apache.org/viewvc?rev=1416253&view=rev
Log:
KAFKA-521 Refactor the log subsystem. Patch reviewed by Neha.


Modified:
    kafka/trunk/core/src/main/scala/kafka/log/FileMessageSet.scala
    kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
    kafka/trunk/core/src/main/scala/kafka/log/LogSegment.scala
    kafka/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala
    kafka/trunk/core/src/main/scala/kafka/log/package.html
    kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala
    kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
    kafka/trunk/core/src/main/scala/kafka/server/KafkaApis.scala
    kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
    kafka/trunk/core/src/main/scala/kafka/server/LeaderElector.scala
    kafka/trunk/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    kafka/trunk/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
    kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala
    kafka/trunk/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
    kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
    kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
    kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java

Modified: kafka/trunk/core/src/main/scala/kafka/log/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/FileMessageSet.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/FileMessageSet.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/FileMessageSet.scala Sun Dec  2 20:50:01 2012
@@ -29,54 +29,83 @@ import java.util.concurrent.TimeUnit
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 
 /**
- * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
- * will fail on an immutable message set. An optional limit and start position can be applied to the message set
- * which will control the position in the file at which the set begins.
+ * An on-disk message set. An optional start and end position can be applied to the message set
+ * which will allow slicing a subset of the file.
+ * @param file The file name for the underlying log data
+ * @param channel the underlying file channel used
+ * @param start A lower bound on the absolute position in the file from which the message set begins
+ * @param end The upper bound on the absolute position in the file at which the message set ends
+ * @param isSlice Should the start and end parameters be used for slicing?
  */
 @nonthreadsafe
 class FileMessageSet private[kafka](val file: File,
                                     private[log] val channel: FileChannel,
-                                    private[log] val start: Int = 0,
-                                    private[log] val limit: Int = Int.MaxValue,
-                                    initChannelPositionToEnd: Boolean = true) extends MessageSet with Logging {
+                                    private[log] val start: Int,
+                                    private[log] val end: Int,
+                                    isSlice: Boolean) extends MessageSet with Logging {
   
   /* the size of the message set in bytes */
-  private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
+  private val _size = 
+    if(isSlice)
+      new AtomicInteger(end - start) // don't check the file size if this is just a slice view
+    else
+      new AtomicInteger(math.min(channel.size().toInt, end) - start)
 
-  if (initChannelPositionToEnd) {
-    /* set the file position to the last byte in the file */
+  /* if this is not a slice, update the file pointer to the end of the file */
+  if (!isSlice)
     channel.position(channel.size)
-  }
 
   /**
-   * Create a file message set with no limit or offset
+   * Create a file message set with no slicing.
+   */
+  def this(file: File, channel: FileChannel) = 
+    this(file, channel, start = 0, end = Int.MaxValue, isSlice = false)
+  
+  /**
+   * Create a file message set with no slicing
    */
-  def this(file: File, channel: FileChannel) = this(file, channel, 0, Int.MaxValue)
+  def this(file: File) = 
+    this(file, Utils.openChannel(file, mutable = true))
   
   /**
-   * Create a file message set with no limit or offset
+   * Create a slice view of the file message set that begins and ends at the given byte offsets
    */
-  def this(file: File) = this(file, Utils.openChannel(file, mutable = true))
+  def this(file: File, channel: FileChannel, start: Int, end: Int) = 
+    this(file, channel, start, end, isSlice = true)
   
   /**
    * Return a message set which is a view into this set starting from the given position and with the given size limit.
+   * 
+   * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
+   * 
+   * If this message set is already sliced, the position will be taken relative to that slicing.
+   * 
+   * @param position The start position to begin the read from
+   * @param size The number of bytes after the start position to include
+   * 
+   * @return A sliced wrapper on this message set limited based on the given position and size
    */
   def read(position: Int, size: Int): FileMessageSet = {
+    if(position < 0)
+      throw new IllegalArgumentException("Invalid position: " + position)
+    if(size < 0)
+      throw new IllegalArgumentException("Invalid size: " + size)
     new FileMessageSet(file,
                        channel,
-                       this.start + position,
-                       scala.math.min(this.start + position + size, sizeInBytes()),
-                       false)
+                       start = this.start + position,
+                       end = math.min(this.start + position + size, sizeInBytes()))
   }
   
   /**
    * Search forward for the file position of the last offset that is great than or equal to the target offset 
    * and return its physical position. If no such offsets are found, return null.
+   * @param targetOffset The offset to search for.
+   * @param startingPosition The starting position in the file to begin searching from.
    */
-  private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
+  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
     var position = startingPosition
     val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
-    val size = _size.get()
+    val size = sizeInBytes()
     while(position + MessageSet.LogOverhead < size) {
       buffer.rewind()
       channel.read(buffer, position)
@@ -94,19 +123,34 @@ class FileMessageSet private[kafka](val 
   }
   
   /**
-   * Write some of this set to the given channel, return the amount written
+   * Write some of this set to the given channel.
+   * @param destChannel The channel to write to.
+   * @param writePosition The position in the message set to begin writing from.
+   * @param size The maximum number of bytes to write
+   * @return The number of bytes actually written.
    */
   def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
-    channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
+    channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt
   
   /**
+   * Get a shallow iterator over the messages in the set.
+   */
+  override def iterator() = iterator(Int.MaxValue)
+    
+  /**
    * Get an iterator over the messages in the set. We only do shallow iteration here.
+   * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory. 
+   * If we encounter a message larger than this we throw an InvalidMessageException.
+   * @return The iterator.
    */
-  override def iterator: Iterator[MessageAndOffset] = {
+  def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
     new IteratorTemplate[MessageAndOffset] {
       var location = start
       
       override def makeNext(): MessageAndOffset = {
+        if(location >= end)
+          return allDone()
+          
         // read the size of the item
         val sizeOffsetBuffer = ByteBuffer.allocate(12)
         channel.read(sizeOffsetBuffer, location)
@@ -116,8 +160,10 @@ class FileMessageSet private[kafka](val 
         sizeOffsetBuffer.rewind()
         val offset = sizeOffsetBuffer.getLong()
         val size = sizeOffsetBuffer.getInt()
-        if (size < Message.MinHeaderSize)
+        if(size < Message.MinHeaderSize)
           return allDone()
+        if(size > maxMessageSize)
+          throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
         
         // read the item itself
         val buffer = ByteBuffer.allocate(size)
@@ -139,7 +185,7 @@ class FileMessageSet private[kafka](val 
   def sizeInBytes(): Int = _size.get()
   
   /**
-   * Append this message to the message set
+   * Append these messages to the message set
    */
   def append(messages: ByteBufferMessageSet) {
     val written = messages.writeTo(channel, 0, messages.sizeInBytes)
@@ -150,9 +196,7 @@ class FileMessageSet private[kafka](val 
    * Commit all written data to the physical disk
    */
   def flush() = {
-    LogFlushStats.logFlushTimer.time {
-      channel.force(true)
-    }
+    channel.force(true)
   }
   
   /**
@@ -165,6 +209,7 @@ class FileMessageSet private[kafka](val 
   
   /**
    * Delete this message set from the filesystem
+   * @return True iff this message set was deleted.
    */
   def delete(): Boolean = {
     Utils.swallow(channel.close())
@@ -172,13 +217,14 @@ class FileMessageSet private[kafka](val 
   }
 
   /**
-   * Truncate this file message set to the given size. Note that this API does no checking that the 
-   * given size falls on a valid byte offset.
+   * Truncate this file message set to the given size in bytes. Note that this API does no checking that the 
+   * given size falls on a valid message boundary.
+   * @param targetSize The size to truncate to.
    */
   def truncateTo(targetSize: Int) = {
-    if(targetSize > sizeInBytes())
-      throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) +
-        " size of this log segment is only %d bytes".format(sizeInBytes()))
+    if(targetSize > sizeInBytes || targetSize < 0)
+      throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
+                               " size of this log segment is " + sizeInBytes + " bytes.")
     channel.truncate(targetSize)
     channel.position(targetSize)
     _size.set(targetSize)

Modified: kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/Log.scala Sun Dec  2 20:50:01 2012
@@ -17,76 +17,17 @@
 
 package kafka.log
 
-import kafka.api.OffsetRequest
 import java.io.{IOException, File}
-import java.util.{Comparator, Collections, ArrayList}
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
 import java.util.concurrent.atomic._
 import kafka.utils._
-import scala.math._
+import scala.collection.JavaConversions.asIterable;
 import java.text.NumberFormat
-import kafka.server.BrokerTopicStat
 import kafka.message._
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 
-object Log {
-  val LogFileSuffix = ".log"
-  val IndexFileSuffix = ".index"
-
-  /**
-   * Search for the greatest range with start <= the target value.
-   */
-  def findRange[T <: Range](ranges: Array[T], value: Long, arraySize: Int): Option[T] = {
-    if(ranges.size < 1)
-      return None
-
-    // check out of bounds
-    if(value < ranges(0).start)
-      return None
-
-    var low = 0
-    var high = arraySize - 1
-    while(low < high) {
-      val mid = ceil((high + low) / 2.0).toInt
-      val found = ranges(mid)
-      if(found.start == value)
-        return Some(found)
-      else if (value < found.start)
-        high = mid - 1
-      else
-        low = mid
-    }
-    Some(ranges(low))
-  }
-
-  def findRange[T <: Range](ranges: Array[T], value: Long): Option[T] =
-    findRange(ranges, value, ranges.length)
-
-  /**
-   * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
-   * so that ls sorts the files numerically
-   */
-  def filenamePrefixFromOffset(offset: Long): String = {
-    val nf = NumberFormat.getInstance()
-    nf.setMinimumIntegerDigits(20)
-    nf.setMaximumFractionDigits(0)
-    nf.setGroupingUsed(false)
-    nf.format(offset)
-  }
-  
-  def logFilename(dir: File, offset: Long) = 
-    new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
-  
-  def indexFilename(dir: File, offset: Long) = 
-    new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
-
-  def getEmptyOffsets(timestamp: Long): Seq[Long] =
-    if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
-      Seq(0L)
-    else Nil
-}
-
 
 /**
  * An append-only log for storing messages.
@@ -96,19 +37,26 @@ object Log {
  * New log segments are created according to a configurable policy that controls the size in bytes or time interval
  * for a given segment.
  * 
+ * @param dir The directory in which log segments are created.
+ * @param maxSegmentSize The maximum segment size in bytes.
+ * @param maxMessageSize The maximum message size in bytes (including headers) that will be allowed in this log.
+ * @param flushInterval The number of messages that can be appended to this log before we force a flush of the log.
+ * @param rollIntervalMs The time after which we will force the rolling of a new log segment
+ * @param needsRecovery Should we run recovery on this log when opening it? This should be done if the log wasn't cleanly shut down.
+ * @param maxIndexSize The maximum size of an offset index in this log. The index of the active log segment will be pre-allocated to this size.
+ * @param indexIntervalBytes The (approximate) number of bytes between entries in the offset index for this log.
+ * 
  */
 @threadsafe
-private[kafka] class Log(val dir: File, 
-                         val maxLogFileSize: Int,
-                         val maxMessageSize: Int, 
-                         val flushInterval: Int = Int.MaxValue,
-                         val rollIntervalMs: Long = Long.MaxValue, 
-                         val needsRecovery: Boolean, 
-                         val maxIndexSize: Int = (10*1024*1024),
-                         val indexIntervalBytes: Int = 4096,
-                         time: Time = SystemTime,
-                         brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
-  this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
+class Log(val dir: File, 
+          val maxSegmentSize: Int,
+          val maxMessageSize: Int, 
+          val flushInterval: Int = Int.MaxValue,
+          val rollIntervalMs: Long = Long.MaxValue, 
+          val needsRecovery: Boolean, 
+          val maxIndexSize: Int = (10*1024*1024),
+          val indexIntervalBytes: Int = 4096,
+          time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
   
@@ -119,13 +67,13 @@ private[kafka] class Log(val dir: File, 
   private val unflushed = new AtomicInteger(0)
 
   /* last time it was flushed */
-  private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
+  private val lastflushedTime = new AtomicLong(time.milliseconds)
 
   /* the actual segments of the log */
-  private[log] val segments: SegmentList[LogSegment] = loadSegments()
+  private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments()
     
   /* Calculate the offset of the next message */
-  private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
+  private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
 
   newGauge(name + "-" + "NumLogSegments",
            new Gauge[Int] { def getValue = numberOfSegments })
@@ -133,13 +81,13 @@ private[kafka] class Log(val dir: File, 
   newGauge(name + "-" + "LogEndOffset",
            new Gauge[Long] { def getValue = logEndOffset })
 
-  /* The name of this log */
+  /** The name of this log */
   def name  = dir.getName()
 
   /* Load the log segments from the log files on disk */
-  private def loadSegments(): SegmentList[LogSegment] = {
+  private def loadSegments(): ConcurrentNavigableMap[Long, LogSegment] = {
     // open all the segments read-only
-    val logSegments = new ArrayList[LogSegment]
+    val logSegments = new ConcurrentSkipListMap[Long, LogSegment]
     val ls = dir.listFiles()
     if(ls != null) {
       for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix)) {
@@ -147,75 +95,46 @@ private[kafka] class Log(val dir: File, 
           throw new IOException("Could not read file " + file)
         val filename = file.getName()
         val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
-        // TODO: we should ideally rebuild any missing index files, instead of erroring out
-        if(!Log.indexFilename(dir, start).exists)
-          throw new IllegalStateException("Found log file with no corresponding index file.")
-        logSegments.add(new LogSegment(dir = dir, 
-                                       startOffset = start,
-                                       indexIntervalBytes = indexIntervalBytes, 
-                                       maxIndexSize = maxIndexSize))
+        val hasIndex = Log.indexFilename(dir, start).exists
+        val segment = new LogSegment(dir = dir, 
+                                     startOffset = start,
+                                     indexIntervalBytes = indexIntervalBytes, 
+                                     maxIndexSize = maxIndexSize)
+        if(!hasIndex) {
+          // this can only happen if someone manually deletes the index file
+          error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
+          segment.recover(maxMessageSize)
+        }
+        logSegments.put(start, segment)
       }
     }
 
     if(logSegments.size == 0) {
-      // no existing segments, create a new mutable segment
-      logSegments.add(new LogSegment(dir = dir, 
+      // no existing segments, create a new mutable segment beginning at offset 0
+      logSegments.put(0,
+                      new LogSegment(dir = dir, 
                                      startOffset = 0,
                                      indexIntervalBytes = indexIntervalBytes, 
                                      maxIndexSize = maxIndexSize))
     } else {
-      // there is at least one existing segment, validate and recover them/it
-      // sort segments into ascending order for fast searching
-      Collections.sort(logSegments, new Comparator[LogSegment] {
-        def compare(s1: LogSegment, s2: LogSegment): Int = {
-          if(s1.start == s2.start) 0
-          else if(s1.start < s2.start) -1
-          else 1
-        }
-      })
-
-      // reset the index size of the last (current active) log segment to its maximum value
-      logSegments.get(logSegments.size() - 1).index.resize(maxIndexSize)
-
-      // run recovery on the last segment if necessary
-      if(needsRecovery)
-        recoverSegment(logSegments.get(logSegments.size - 1))
-    }
-    new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
-  }
-  
-  /**
-   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
-   */
-  private def recoverSegment(segment: LogSegment) {
-    segment.index.truncate()
-    var validBytes = 0
-    var lastIndexEntry = 0
-    val iter = segment.messageSet.iterator
-    try {
-      while(iter.hasNext) {
-        val entry = iter.next
-        entry.message.ensureValid()
-        if(validBytes - lastIndexEntry > indexIntervalBytes) {
-          segment.index.append(entry.offset, validBytes)
-          lastIndexEntry = validBytes
-        }
-        validBytes += MessageSet.entrySize(entry.message)
+      // reset the index size of the currently active log segment to allow more entries
+      val active = logSegments.lastEntry.getValue
+      active.index.resize(maxIndexSize)
+
+      // run recovery on the active segment if necessary
+      if(needsRecovery) {
+        info("Recovering active segment of %s.".format(name))
+        active.recover(maxMessageSize)
       }
-    } catch {
-      case e: InvalidMessageException => 
-        logger.warn("Found invalid messages in log " + name)
     }
-    val truncated = segment.messageSet.sizeInBytes - validBytes
-    if(truncated > 0)
-      warn("Truncated " + truncated + " invalid bytes from the log " + name + ".")
-    segment.messageSet.truncateTo(validBytes)
+    logSegments
   }
 
   /**
-   * The number of segments in the log
+   * The number of segments in the log.
+   * Take care! this is an O(n) operation.
    */
-  def numberOfSegments: Int = segments.view.length
+  def numberOfSegments: Int = segments.size
 
   /**
    * Close this log
@@ -223,7 +142,7 @@ private[kafka] class Log(val dir: File, 
   def close() {
     debug("Closing log " + name)
     lock synchronized {
-      for(seg <- segments.view)
+      for(seg <- logSegments)
         seg.close()
     }
   }
@@ -234,78 +153,81 @@ private[kafka] class Log(val dir: File, 
    * This method will generally be responsible for assigning offsets to the messages, 
    * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
    * 
-   * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set, 
-   * or (-1,-1) if the message set is empty
+   * @param messages The message set to append
+   * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
+   * 
+   * @throws KafkaStorageException If the append fails due to an I/O error.
+   * 
+   * @return Information about the appended messages including the first and last offset
    */
-  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = {
-    val messageSetInfo = analyzeAndValidateMessageSet(messages)
+  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
+    val appendInfo = analyzeAndValidateMessageSet(messages)
     
     // if we have any valid messages, append them to the log
-    if(messageSetInfo.count == 0) {
-      (-1L, -1L)
-    } else {
-      BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count)
-      BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count)
-
-      // trim any invalid bytes or partial messages before appending it to the on-disk log
-      var validMessages = trimInvalidBytes(messages)
+    if(appendInfo.count == 0)
+      return appendInfo
+      
+    // trim any invalid bytes or partial messages before appending it to the on-disk log
+    var validMessages = trimInvalidBytes(messages)
 
-      try {
-        // they are valid, insert them in the log
-        val offsets = lock synchronized {
-          // maybe roll the log if this segment is full
-          val segment = maybeRoll(segments.view.last)
+    try {
+      // they are valid, insert them in the log
+      lock synchronized {
+        // maybe roll the log if this segment is full
+        val segment = maybeRoll()
           
-          // assign offsets to the messageset
-          val offsets = 
-            if(assignOffsets) {
-              val firstOffset = nextOffset.get
-              validMessages = validMessages.assignOffsets(nextOffset, messageSetInfo.codec)
-              val lastOffset = nextOffset.get - 1
-              (firstOffset, lastOffset)
-            } else {
-              if(!messageSetInfo.offsetsMonotonic)
-                throw new IllegalArgumentException("Out of order offsets found in " + messages)
-              nextOffset.set(messageSetInfo.lastOffset + 1)
-              (messageSetInfo.firstOffset, messageSetInfo.lastOffset)
-            }
+        if(assignOffsets) {
+           // assign offsets to the messageset
+          appendInfo.firstOffset = nextOffset.get
+          validMessages = validMessages.assignOffsets(nextOffset, appendInfo.codec)
+          appendInfo.lastOffset = nextOffset.get - 1
+        } else {
+          // we are taking the offsets we are given
+          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
+              throw new IllegalArgumentException("Out of order offsets found in " + messages)
+          nextOffset.set(appendInfo.lastOffset + 1)
+        }
           
-          // now append to the log
-          trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s"
-                .format(this.name, offsets._1, nextOffset.get(), validMessages))
-          segment.append(offsets._1, validMessages)
+        // now append to the log
+        trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset))
+        segment.append(appendInfo.firstOffset, validMessages)
           
-          // return the offset at which the messages were appended
-          offsets
-        }
-        
         // maybe flush the log and index
-        maybeFlush(messageSetInfo.count)
+        maybeFlush(appendInfo.count)
         
-        // return the first and last offset
-        offsets
-      } catch {
-        case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
+        appendInfo
       }
+    } catch {
+      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
     }
   }
   
-  /* struct to hold various quantities we compute about each message set before appending to the log */
-  case class MessageSetAppendInfo(firstOffset: Long, lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean)
+  /** Struct to hold various quantities we compute about each message set before appending to the log
+   * @param firstOffset The first offset in the message set
+   * @param lastOffset The last offset in the message set
+   * @param codec The codec used in the message set
+   * @param count The number of messages
+   * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
+   */
+  case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean)
   
   /**
    * Validate the following:
-   * 1. each message is not too large
-   * 2. each message matches its CRC
+   * <ol>
+   * <li> each message is not too large
+   * <li> each message matches its CRC
+   * </ol>
    * 
    * Also compute the following quantities:
-   * 1. First offset in the message set
-   * 2. Last offset in the message set
-   * 3. Number of messages
-   * 4. Whether the offsets are monotonically increasing
-   * 5. Whether any compression codec is used (if many are used, then the last one is given)
+   * <ol>
+   * <li> First offset in the message set
+   * <li> Last offset in the message set
+   * <li> Number of messages
+   * <li> Whether the offsets are monotonically increasing
+   * <li> Whether any compression codec is used (if many are used, then the last one is given)
+   * </ol>
    */
-  private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): MessageSetAppendInfo = {
+  private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
     var messageCount = 0
     var firstOffset, lastOffset = -1L
     var codec: CompressionCodec = NoCompressionCodec
@@ -332,11 +254,13 @@ private[kafka] class Log(val dir: File, 
       if(messageCodec != NoCompressionCodec)
         codec = messageCodec
     }
-    MessageSetAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic)
+    LogAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic)
   }
   
   /**
    * Trim any invalid bytes from the end of this message set (if there are any)
+   * @param messages The message set to trim
+   * @return A trimmed message set. This may be the same as what was passed in or it may not.
    */
   private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = {
     val messageSetValidBytes = messages.validBytes
@@ -353,118 +277,131 @@ private[kafka] class Log(val dir: File, 
   }
 
   /**
-   * Read a message set from the log. 
-   * startOffset - The logical offset to begin reading at
-   * maxLength - The maximum number of bytes to read
-   * maxOffset - The maximum logical offset to include in the resulting message set
+   * Read messages from the log
+   * @param startOffset The offset to begin reading at
+   * @param maxLength The maximum number of bytes to read
+   * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
+   * 
+   * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
+   * @return The messages read
    */
   def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = {
     trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
-    val view = segments.view
-        
+
     // check if the offset is valid and in range
-    val first = view.head.start
     val next = nextOffset.get
     if(startOffset == next)
       return MessageSet.Empty
-    else if(startOffset > next || startOffset < first)
-      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, first, next))
     
-    // Do the read on the segment with a base offset less than the target offset
-    // TODO: to handle sparse offsets, we need to skip to the next segment if this read doesn't find anything
-    Log.findRange(view, startOffset, view.length) match {
-      case None => throw new OffsetOutOfRangeException("Offset is earlier than the earliest offset")
-      case Some(segment) => segment.read(startOffset, maxLength, maxOffset)
+    var entry = segments.floorEntry(startOffset)
+      
+    // attempt to read beyond the log end offset is an error
+    if(startOffset > next || entry == null)
+      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
+    
+    // do the read on the segment with a base offset less than the target offset
+    // but if that segment doesn't contain any messages with an offset greater than that
+    // continue to read from successive segments until we get some messages or we reach the end of the log
+    while(entry != null) {
+      val messages = entry.getValue.read(startOffset, maxOffset, maxLength)
+      if(messages == null)
+        entry = segments.higherEntry(entry.getKey)
+      else
+        return messages
     }
+    
+    // okay we are beyond the end of the last segment but less than the log end offset
+    MessageSet.Empty
   }
 
   /**
-   * Delete any log segments matching the given predicate function
-   */
-  def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
-    lock synchronized {
-      val view = segments.view
-      val deletable = view.takeWhile(predicate)
-      for(seg <- deletable)
-        seg.deleted = true
-      var numToDelete = deletable.size
-      // if we are deleting everything, create a new empty segment
-      if(numToDelete == view.size) {
-        if (view(numToDelete - 1).size > 0)
+   * Delete any log segments matching the given predicate function,
+   * starting with the oldest segment and moving forward until a segment doesn't match.
+   * @param predicate A function that takes in a single log segment and returns true iff it is deletable
+   * @return The number of segments deleted
+   */
+  def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
+    // find any segments that match the user-supplied predicate UNLESS it is the final segment 
+    // and it is empty (since we would just end up re-creating it
+    val lastSegment = activeSegment
+    var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
+    val numToDelete = deletable.size
+    if(numToDelete > 0) {
+      lock synchronized {
+        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
+        if(segments.size == numToDelete)
           roll()
-        else {
-          // If the last segment to be deleted is empty and we roll the log, the new segment will have the same
-          // file name. So simply reuse the last segment and reset the modified time.
-          view(numToDelete - 1).messageSet.file.setLastModified(time.milliseconds)
-          numToDelete -=1
-        }
+        // remove the segments for lookups
+        deletable.foreach(d => segments.remove(d.baseOffset))
       }
-      segments.trunc(numToDelete)
+      // do not lock around actual file deletion, it isn't O(1) on many filesystems
+      deletable.foreach(_.delete())
     }
+    numToDelete
   }
 
   /**
-   * Get the size of the log in bytes
+   * The size of the log in bytes
    */
-  def size: Long = segments.view.foldLeft(0L)(_ + _.size)
+  def size: Long = logSegments.map(_.size).sum
 
   /**
-   *  Get the offset of the next message that will be appended
+   *  The offset of the next message that will be appended to the log
    */
   def logEndOffset: Long = nextOffset.get
 
   /**
-   * Roll the log over if necessary
+   * Roll the log over to a new empty log segment if necessary
+   * @return The currently active segment after (perhaps) rolling to a new segment
    */
-  private def maybeRoll(segment: LogSegment): LogSegment = {
-    if ((segment.messageSet.sizeInBytes > maxLogFileSize) ||
-       ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) ||
+  private def maybeRoll(): LogSegment = {
+    val segment = activeSegment
+    if ((segment.size > maxSegmentSize) ||
+       (segment.size > 0 && time.milliseconds - segment.created > rollIntervalMs) ||
        segment.index.isFull)
       roll()
     else
       segment
   }
-
+  
   /**
-   * Create a new segment and make it active, and return it
+   * Roll the log over to a new active segment starting with the current logEndOffset.
+   * This will trim the index to the exact size of the number of entries it currently contains.
+   * @return The newly rolled segment
    */
   def roll(): LogSegment = {
     lock synchronized {
-      flush()
-      rollToOffset(logEndOffset)
-    }
-  }
-  
-  /**
-   * Roll the log over to the given new offset value
-   */
-  private def rollToOffset(newOffset: Long): LogSegment = {
-    val logFile = logFilename(dir, newOffset)
-    val indexFile = indexFilename(dir, newOffset)
-    for(file <- List(logFile, indexFile); if file.exists) {
-      warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
-      file.delete()
-    }
-    debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
-    segments.view.lastOption match {
-      case Some(segment) => segment.index.trimToValidSize()
-      case None => 
+      // flush the log to ensure that only the active segment needs to be recovered
+      if(!segments.isEmpty())
+        flush()
+  
+      val newOffset = logEndOffset
+      val logFile = logFilename(dir, newOffset)
+      val indexFile = indexFilename(dir, newOffset)
+      for(file <- List(logFile, indexFile); if file.exists) {
+        warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
+        file.delete()
+      }
+    
+      debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
+      segments.lastEntry() match {
+        case null => 
+        case entry => entry.getValue.index.trimToValidSize()
+      }
+      val segment = new LogSegment(dir, 
+                                   startOffset = newOffset,
+                                   indexIntervalBytes = indexIntervalBytes, 
+                                   maxIndexSize = maxIndexSize)
+      val prev = segments.put(segment.baseOffset, segment)
+      if(prev != null)
+        throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset))
+      segment
     }
-
-    val segmentsView = segments.view
-    if(segmentsView.size > 0 && segmentsView.last.start == newOffset)
-      throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset))
-
-    val segment = new LogSegment(dir, 
-                                 startOffset = newOffset,
-                                 indexIntervalBytes = indexIntervalBytes, 
-                                 maxIndexSize = maxIndexSize)
-    segments.append(segment)
-    segment
   }
 
   /**
    * Flush the log if necessary
+   * @param numberOfMessages The number of messages that are being appended
    */
   private def maybeFlush(numberOfMessages : Int) {
     if(unflushed.addAndGet(numberOfMessages) >= flushInterval)
@@ -472,144 +409,128 @@ private[kafka] class Log(val dir: File, 
   }
 
   /**
-   * Flush this log file to the physical disk
+   * Flush this log file and assoicated index to the physical disk
    */
   def flush() : Unit = {
     if (unflushed.get == 0)
       return
 
+    debug("Flushing log '" + name + "' last flushed: " + lastFlushTime + " current time: " +
+          time.milliseconds + " unflushed = " + unflushed.get)
     lock synchronized {
-      debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
-          time.milliseconds)
-      segments.view.last.flush()
+      activeSegment.flush()
       unflushed.set(0)
       lastflushedTime.set(time.milliseconds)
      }
   }
 
-  def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
-    val segsArray = segments.view
-    var offsetTimeArray: Array[(Long, Long)] = null
-    if(segsArray.last.size > 0)
-      offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
-    else
-      offsetTimeArray = new Array[(Long, Long)](segsArray.length)
-
-    for(i <- 0 until segsArray.length)
-      offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified)
-    if(segsArray.last.size > 0)
-      offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds)
-
-    var startIndex = -1
-    timestamp match {
-      case OffsetRequest.LatestTime =>
-        startIndex = offsetTimeArray.length - 1
-      case OffsetRequest.EarliestTime =>
-        startIndex = 0
-      case _ =>
-        var isFound = false
-        debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
-        startIndex = offsetTimeArray.length - 1
-        while (startIndex >= 0 && !isFound) {
-          if (offsetTimeArray(startIndex)._2 <= timestamp)
-            isFound = true
-          else
-            startIndex -=1
-        }
-    }
-
-    val retSize = maxNumOffsets.min(startIndex + 1)
-    val ret = new Array[Long](retSize)
-    for(j <- 0 until retSize) {
-      ret(j) = offsetTimeArray(startIndex)._1
-      startIndex -= 1
-    }
-    // ensure that the returned seq is in descending order of offsets
-    ret.toSeq.sortBy(- _)
-  }
-
+  /**
+   * Delete this log from the filesystem entirely
+   */
   def delete(): Unit = {
-    deleteSegments(segments.contents.get())
+    logSegments.foreach(_.delete())
     Utils.rm(dir)
   }
-
-
-  /* Attempts to delete all provided segments from a log and returns how many it was able to */
-  def deleteSegments(segments: Seq[LogSegment]): Int = {
-    var total = 0
-    for(segment <- segments) {
-      info("Deleting log segment " + segment.start + " from " + name)
-      val deletedLog = segment.messageSet.delete()
-      val deletedIndex = segment.index.delete()
-      if(!deletedIndex || !deletedLog) {
-        throw new KafkaStorageException("Deleting log segment " + segment.start + " failed.")
-      } else {
-        total += 1
-      }
-    }
-    total
-  }
   
+  /**
+   * Truncate this log so that it ends with the greatest offset < targetOffset.
+   * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
+   */
   def truncateTo(targetOffset: Long) {
+    info("Truncating log %s to offset %d.".format(name, targetOffset))
     if(targetOffset < 0)
       throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
+    if(targetOffset > logEndOffset) {
+      info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1))
+      return
+    }
     lock synchronized {
-      val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
-      val viewSize = segments.view.size
-      val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
-      /* We should not hit this error because segments.view is locked in markedDeletedWhile() */
-      if(numSegmentsDeleted != segmentsToBeDeleted.size)
-        error("Failed to delete some segments when attempting to truncate to offset " + targetOffset +")")
-      if (numSegmentsDeleted == viewSize) {
-        segments.trunc(segments.view.size)
-        rollToOffset(targetOffset)
-        this.nextOffset.set(targetOffset)
+      if(segments.firstEntry.getValue.baseOffset > targetOffset) {
+        truncateFullyAndStartAt(targetOffset)
       } else {
-        if(targetOffset > logEndOffset) {
-          error("Target offset %d cannot be greater than the last message offset %d in the log %s".
-                format(targetOffset, logEndOffset, segments.view.last.messageSet.file.getAbsolutePath))
-        } else {
-          // find the log segment that has this hw
-          val segmentToBeTruncated = findRange(segments.view, targetOffset)
-          segmentToBeTruncated match {
-            case Some(segment) =>
-              val truncatedSegmentIndex = segments.view.indexOf(segment)
-              segments.truncLast(truncatedSegmentIndex)
-              segment.truncateTo(targetOffset)
-              this.nextOffset.set(targetOffset)
-              info("Truncated log segment %s to target offset %d".format(segments.view.last.messageSet.file.getAbsolutePath, targetOffset))
-            case None => // nothing to do
-          }
-        }
+        val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
+        deletable.foreach(s => segments.remove(s.baseOffset))
+        deletable.foreach(_.delete())
+        activeSegment.truncateTo(targetOffset)
+        this.nextOffset.set(targetOffset)
       }
     }
   }
     
-    /**
-   *  Truncate all segments in the log and start a new segment on a new offset
+  /**
+   *  Delete all data in the log and start at the new offset
+   *  @param newOffset The new offset to start the log with
    */
-  def truncateAndStartWithNewOffset(newOffset: Long) {
+  def truncateFullyAndStartAt(newOffset: Long) {
+    debug("Truncate and start log '" + name + "' to " + newOffset)
     lock synchronized {
-      val deletedSegments = segments.trunc(segments.view.size)
-      debug("Truncate and start log '" + name + "' to " + newOffset)
-      segments.append(new LogSegment(dir, 
-                                     newOffset,
-                                     indexIntervalBytes = indexIntervalBytes, 
-                                     maxIndexSize = maxIndexSize))
-      deleteSegments(deletedSegments)
+      val segmentsToDelete = logSegments.toList
+      segments.clear()
+      segmentsToDelete.foreach(_.delete())
+      segments.put(newOffset, 
+                   new LogSegment(dir, 
+                                  newOffset,
+                                  indexIntervalBytes = indexIntervalBytes, 
+                                  maxIndexSize = maxIndexSize))
       this.nextOffset.set(newOffset)
     }
   }
 
-  def topicName():String = {
-    name.substring(0, name.lastIndexOf("-"))
-  }
+  /**
+   * The time this log is last known to have been fully flushed to disk
+   */
+  def lastFlushTime(): Long = lastflushedTime.get
+  
+  /**
+   * The active segment that is currently taking appends
+   */
+  def activeSegment = segments.lastEntry.getValue
+  
+  /**
+   * All the log segments in this log ordered from oldest to newest
+   */
+  def logSegments: Iterable[LogSegment] = asIterable(segments.values)
+  
+  override def toString() = "Log(" + this.dir + ")"
+  
+}
 
-  def getLastFlushedTime():Long = {
-    return lastflushedTime.get
+/**
+ * Helper functions for logs
+ */
+object Log {
+  val LogFileSuffix = ".log"
+  val IndexFileSuffix = ".index"
+
+  /**
+   * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
+   * so that ls sorts the files numerically.
+   * @param offset The offset to use in the file name
+   * @return The filename
+   */
+  def filenamePrefixFromOffset(offset: Long): String = {
+    val nf = NumberFormat.getInstance()
+    nf.setMinimumIntegerDigits(20)
+    nf.setMaximumFractionDigits(0)
+    nf.setGroupingUsed(false)
+    nf.format(offset)
   }
   
-  override def toString() = "Log(" + this.dir + ")"
+  /**
+   * Construct a log file name in the given dir with the given base offset
+   * @param dir The directory in which the log will reside
+   * @param offset The base offset of the log file
+   */
+  def logFilename(dir: File, offset: Long) = 
+    new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
+  
+  /**
+   * Construct an index file name in the given dir using the given base offset
+   * @param dir The directory in which the log will reside
+   * @param offset The base offset of the log file
+   */
+  def indexFilename(dir: File, offset: Long) = 
+    new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
   
 }
   

Modified: kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Sun Dec  2 20:50:01 2012
@@ -20,7 +20,6 @@ package kafka.log
 import java.io._
 import kafka.utils._
 import scala.collection._
-import kafka.log.Log._
 import kafka.common.{TopicAndPartition, KafkaException}
 import kafka.server.KafkaConfig
 
@@ -36,9 +35,9 @@ import kafka.server.KafkaConfig
  * A background thread handles log retention by periodically truncating excess log segments.
  */
 @threadsafe
-private[kafka] class LogManager(val config: KafkaConfig,
-                                scheduler: KafkaScheduler,
-                                private val time: Time) extends Logging {
+class LogManager(val config: KafkaConfig,
+                 scheduler: KafkaScheduler,
+                 private val time: Time) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
   val LockFile = ".lock"
@@ -62,9 +61,12 @@ private[kafka] class LogManager(val conf
   loadLogs(logDirs)
   
   /**
-   * 1. Ensure that there are no duplicates in the directory list
-   * 2. Create each directory if it doesn't exist
-   * 3. Check that each path is a readable directory 
+   * Create and check validity of the given directories, specifically:
+   * <ol>
+   * <li> Ensure that there are no duplicates in the directory list
+   * <li> Create each directory if it doesn't exist
+   * <li> Check that each path is a readable directory 
+   * </ol>
    */
   private def createAndValidateLogDirs(dirs: Seq[File]) {
     if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
@@ -95,7 +97,7 @@ private[kafka] class LogManager(val conf
   }
   
   /**
-   * Recovery and load all logs in the given data directories
+   * Recover and load all logs in the given data directories
    */
   private def loadLogs(dirs: Seq[File]) {
     for(dir <- dirs) {
@@ -120,8 +122,7 @@ private[kafka] class LogManager(val conf
                               needsRecovery, 
                               config.logIndexMaxSizeBytes,
                               config.logIndexIntervalBytes,
-                              time, 
-                              config.brokerId)
+                              time)
             val previous = this.logs.put(topicPartition, log)
             if(previous != null)
               throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
@@ -132,7 +133,7 @@ private[kafka] class LogManager(val conf
   }
 
   /**
-   *  Start the log flush thread
+   *  Start the background threads to flush logs and do log cleanup
    */
   def startup() {
     /* Schedule the cleanup task to delete old logs */
@@ -140,14 +141,17 @@ private[kafka] class LogManager(val conf
       info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
       scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
       info("Starting log flusher every " + config.flushSchedulerThreadRate +
-                   " ms with the following overrides " + logFlushIntervals)
-      scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-",
-                                 config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
+           " ms with the following overrides " + logFlushIntervals)
+      scheduler.scheduleWithRate(flushDirtyLogs, 
+                                 "kafka-logflusher-",
+                                 config.flushSchedulerThreadRate, 
+                                 config.flushSchedulerThreadRate, 
+                                 isDaemon = false)
     }
   }
   
   /**
-   * Get the log if it exists
+   * Get the log if it exists, otherwise return None
    */
   def getLog(topic: String, partition: Int): Option[Log] = {
     val topicAndPartiton = TopicAndPartition(topic, partition)
@@ -159,7 +163,7 @@ private[kafka] class LogManager(val conf
   }
 
   /**
-   * Create the log if it does not exist, if it exists just return it
+   * Create the log if it does not exist, otherwise just return it
    */
   def getOrCreateLog(topic: String, partition: Int): Log = {
     val topicAndPartition = TopicAndPartition(topic, partition)
@@ -195,8 +199,7 @@ private[kafka] class LogManager(val conf
                     needsRecovery = false, 
                     config.logIndexMaxSizeBytes, 
                     config.logIndexIntervalBytes, 
-                    time, 
-                    config.brokerId)
+                    time)
       info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
       logs.put(topicAndPartition, log)
       log
@@ -223,14 +226,6 @@ private[kafka] class LogManager(val conf
     }
   }
 
-  def getOffsets(topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
-    val log = getLog(topicAndPartition.topic, topicAndPartition.partition)
-    log match {
-      case Some(l) => l.getOffsetsBefore(timestamp, maxNumOffsets)
-      case None => getEmptyOffsets(timestamp)
-    }
-  }
-
   /**
    * Runs through the log removing segments older than a certain age
    */
@@ -238,9 +233,7 @@ private[kafka] class LogManager(val conf
     val startMs = time.milliseconds
     val topic = parseTopicPartitionName(log.name).topic
     val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
-    val toBeDeleted = log.markDeletedWhile(startMs - _.messageSet.file.lastModified > logCleanupThresholdMs)
-    val total = log.deleteSegments(toBeDeleted)
-    total
+    log.deleteOldSegments(startMs - _.lastModified > logCleanupThresholdMs)
   }
 
   /**
@@ -250,7 +243,8 @@ private[kafka] class LogManager(val conf
   private def cleanupSegmentsToMaintainSize(log: Log): Int = {
     val topic = parseTopicPartitionName(log.dir.getName).topic
     val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
-    if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
+    if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize)
+      return 0
     var diff = log.size - maxLogRetentionSize
     def shouldDelete(segment: LogSegment) = {
       if(diff - segment.size >= 0) {
@@ -260,9 +254,7 @@ private[kafka] class LogManager(val conf
         false
       }
     }
-    val toBeDeleted = log.markDeletedWhile( shouldDelete )
-    val total = log.deleteSegments(toBeDeleted)
-    total
+    log.deleteOldSegments(shouldDelete)
   }
 
   /**
@@ -307,19 +299,20 @@ private[kafka] class LogManager(val conf
    */
   private def flushDirtyLogs() = {
     debug("Checking for dirty logs to flush...")
-    for (log <- allLogs) {
+    for ((topicAndPartition, log) <- logs) {
       try {
-        val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
+        val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
+        
         var logFlushInterval = config.defaultFlushIntervalMs
-        if(logFlushIntervals.contains(log.topicName))
-          logFlushInterval = logFlushIntervals(log.topicName)
-        debug(log.topicName + " flush interval  " + logFlushInterval +
-                      " last flushed " + log.getLastFlushedTime + " time since last flush: " + timeSinceLastFlush)
+        if(logFlushIntervals.contains(topicAndPartition.topic))
+          logFlushInterval = logFlushIntervals(topicAndPartition.topic)
+        debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval  " + logFlushInterval +
+              " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= logFlushInterval)
           log.flush
       } catch {
         case e =>
-          error("Error flushing topic " + log.topicName, e)
+          error("Error flushing topic " + topicAndPartition.topic, e)
           e match {
             case _: IOException =>
               fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
@@ -330,11 +323,12 @@ private[kafka] class LogManager(val conf
     }
   }
 
+  /**
+   * Parse the topic and partition out of the directory name of a log
+   */
   private def parseTopicPartitionName(name: String): TopicAndPartition = {
     val index = name.lastIndexOf('-')
     TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
   }
 
-  def topics(): Iterable[String] = logs.keys.map(_.topic)
-
 }

Modified: kafka/trunk/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/LogSegment.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/LogSegment.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/LogSegment.scala Sun Dec  2 20:50:01 2012
@@ -3,6 +3,7 @@ package kafka.log
 import scala.math._
 import java.io.File
 import kafka.message._
+import kafka.common._
 import kafka.utils._
 
 /**
@@ -12,24 +13,24 @@ import kafka.utils._
  * any previous segment.
  * 
  * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. 
+ * 
+ * @param log The message set containing log entries
+ * @param index The offset index
+ * @param baseOffset A lower bound on the offsets in this segment
+ * @param indexIntervalBytes The approximate number of bytes between entries in the index
+ * @param time The time instance
  */
 @nonthreadsafe
-class LogSegment(val messageSet: FileMessageSet, 
+class LogSegment(val log: FileMessageSet, 
                  val index: OffsetIndex, 
-                 val start: Long, 
+                 val baseOffset: Long, 
                  val indexIntervalBytes: Int,
-                 time: Time) extends Range with Logging {
+                 time: Time) extends Logging {
   
-  var firstAppendTime: Option[Long] =
-    if (messageSet.sizeInBytes > 0)
-      Some(time.milliseconds)
-    else
-      None
+  var created = time.milliseconds
   
   /* the number of bytes since we last added an entry in the offset index */
-  var bytesSinceLastIndexEntry = 0
-  
-  @volatile var deleted = false
+  private var bytesSinceLastIndexEntry = 0
   
   def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) = 
     this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), 
@@ -39,49 +40,62 @@ class LogSegment(val messageSet: FileMes
          SystemTime)
     
   /* Return the size in bytes of this log segment */
-  def size: Long = messageSet.sizeInBytes()
-
-  def updateFirstAppendTime() {
-    if (firstAppendTime.isEmpty)
-      firstAppendTime = Some(time.milliseconds)
-  }
-
+  def size: Long = log.sizeInBytes()
+  
   /**
    * Append the given messages starting with the given offset. Add
    * an entry to the index if needed.
    * 
-   * It is assumed this method is being called from within a lock
+   * It is assumed this method is being called from within a lock.
+   * 
+   * @param offset The first offset in the message set.
+   * @param messages The messages to append.
    */
+  @nonthreadsafe
   def append(offset: Long, messages: ByteBufferMessageSet) {
     if (messages.sizeInBytes > 0) {
-      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, messageSet.sizeInBytes()))
+      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
       // append an entry to the index (if needed)
       if(bytesSinceLastIndexEntry > indexIntervalBytes) {
-        index.append(offset, messageSet.sizeInBytes())
+        index.append(offset, log.sizeInBytes())
         this.bytesSinceLastIndexEntry = 0
       }
       // append the messages
-      messageSet.append(messages)
-      updateFirstAppendTime()
+      log.append(messages)
       this.bytesSinceLastIndexEntry += messages.sizeInBytes
     }
   }
   
   /**
-   * Find the physical file position for the least offset >= the given offset. If no offset is found
-   * that meets this criteria before the end of the log, return null.
+   * Find the physical file position for the first message with offset >= the requested offset.
+   * 
+   * The lowerBound argument is an optimization that can be used if we already know a valid starting position
+   * in the file higher than the greast-lower-bound from the index.
+   * 
+   * @param offset The offset we want to translate
+   * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
+   * when omitted, the search will begin at the position in the offset index.
+   * 
+   * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
    */
-  private def translateOffset(offset: Long): OffsetPosition = {
+  @threadsafe
+  private def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
     val mapping = index.lookup(offset)
-    messageSet.searchFor(offset, mapping.position)
+    log.searchFor(offset, max(mapping.position, startingFilePosition))
   }
   
   /**
-   * Read a message set from this segment beginning with the first offset
-   * greater than or equal to the startOffset. The message set will include
+   * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
    * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
+   * 
+   * @param startOffset A lower bound on the first offset to include in the message set we read
+   * @param maxSize The maximum number of bytes to include in the message set we read
+   * @param maxOffset An optional maximum offset for the message set we read
+   * 
+   * @return The message set read or null if the startOffset is larger than the largest offset in this log.
    */
-  def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = {
+  @threadsafe
+  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = {
     if(maxSize < 0)
       throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
     if(maxSize == 0)
@@ -89,9 +103,9 @@ class LogSegment(val messageSet: FileMes
       
     val startPosition = translateOffset(startOffset)
     
-    // if the start position is already off the end of the log, return MessageSet.Empty
+    // if the start position is already off the end of the log, return null
     if(startPosition == null)
-      return MessageSet.Empty
+      return null
     
     // calculate the length of the message set to read based on whether or not they gave us a maxOffset
     val length = 
@@ -103,23 +117,58 @@ class LogSegment(val messageSet: FileMes
           // there is a max offset, translate it to a file position and use that to calculate the max read size
           if(offset < startOffset)
             throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
-          val mapping = translateOffset(offset)
+          val mapping = translateOffset(offset, startPosition.position)
           val endPosition = 
             if(mapping == null)
-              messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file
+              log.sizeInBytes() // the max offset is off the end of the log, use the end of the file
             else
               mapping.position
           min(endPosition - startPosition.position, maxSize) 
         }
       }
-    messageSet.read(startPosition.position, length)
+    log.read(startPosition.position, length)
+  }
+  
+  /**
+   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
+   * 
+   * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
+   * is corrupt.
+   */
+  @nonthreadsafe
+  def recover(maxMessageSize: Int) {
+    index.truncate()
+    var validBytes = 0
+    var lastIndexEntry = 0
+    val iter = log.iterator(maxMessageSize)
+    try {
+      while(iter.hasNext) {
+        val entry = iter.next
+        entry.message.ensureValid()
+        if(validBytes - lastIndexEntry > indexIntervalBytes) {
+          index.append(entry.offset, validBytes)
+          lastIndexEntry = validBytes
+        }
+        validBytes += MessageSet.entrySize(entry.message)
+      }
+    } catch {
+      case e: InvalidMessageException => 
+        logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
+    }
+    val truncated = log.sizeInBytes - validBytes
+    if(truncated > 0)
+      warn("Truncated " + truncated + " invalid bytes from the log segment %s.".format(log.file.getAbsolutePath))
+    log.truncateTo(validBytes)
   }
 
-  override def toString() = "LogSegment(start=" + start + ", size=" + size + ")"
+  override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
 
   /**
-   * Truncate off all index and log entries with offsets greater than or equal to the current offset. 
+   * Truncate off all index and log entries with offsets >= the given offset.
+   * If the given offset is larger than the largest message in this segment, do nothing.
+   * @param offset The offset to truncate to
    */
+  @nonthreadsafe
   def truncateTo(offset: Long) {
     val mapping = translateOffset(offset)
     if(mapping == null)
@@ -127,29 +176,37 @@ class LogSegment(val messageSet: FileMes
     index.truncateTo(offset)
     // after truncation, reset and allocate more space for the (new currently  active) index
     index.resize(index.maxIndexSize)
-    messageSet.truncateTo(mapping.position)
-    if (messageSet.sizeInBytes == 0)
-      firstAppendTime = None
+    log.truncateTo(mapping.position)
+    if (log.sizeInBytes == 0)
+      created = time.milliseconds
   }
   
   /**
    * Calculate the offset that would be used for the next message to be append to this segment.
-   * Not that this is expensive.
+   * Note that this is expensive.
    */
+  @threadsafe
   def nextOffset(): Long = {
-    val ms = read(index.lastOffset, messageSet.sizeInBytes, None)
-    ms.lastOption match {
-      case None => start
-      case Some(last) => last.nextOffset
+    val ms = read(index.lastOffset, None, log.sizeInBytes)
+    if(ms == null) {
+      baseOffset
+    } else {
+      ms.lastOption match {
+        case None => baseOffset
+        case Some(last) => last.nextOffset
+      }
     }
   }
   
   /**
    * Flush this log segment to disk
    */
+  @threadsafe
   def flush() {
-    messageSet.flush()
-    index.flush()
+    LogFlushStats.logFlushTimer.time {
+      log.flush()
+      index.flush()
+    }
   }
   
   /**
@@ -157,7 +214,25 @@ class LogSegment(val messageSet: FileMes
    */
   def close() {
     Utils.swallow(index.close)
-    Utils.swallow(messageSet.close)
+    Utils.swallow(log.close)
   }
   
+  /**
+   * Delete this log segment from the filesystem.
+   * @throws KafkaStorageException if the delete fails.
+   */
+  def delete() {
+    val deletedLog = log.delete()
+    val deletedIndex = index.delete()
+    if(!deletedLog && log.file.exists)
+      throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
+    if(!deletedIndex && index.file.exists)
+      throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
+  }
+  
+  /**
+   * The last modified time of this log segment as a unix time stamp
+   */
+  def lastModified = log.file.lastModified
+
 }
\ No newline at end of file

Modified: kafka/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala Sun Dec  2 20:50:01 2012
@@ -51,7 +51,7 @@ import kafka.utils._
  */
 class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
   
-  /* the memory mapping */
+  /* initialize the memory mapping for this index */
   private var mmap: MappedByteBuffer = 
     {
       val newlyCreated = file.createNewFile()
@@ -84,10 +84,12 @@ class OffsetIndex(val file: File, val ba
       }
     }
   
-  /* the maximum number of entries this index can hold */
+  /**
+   * The maximum number of eight-byte entries this index can hold
+   */
   def maxEntries = mmap.limit / 8
   
-  /* the number of entries in the index */
+  /* the number of eight-byte entries currently in the index */
   private var size = new AtomicInteger(mmap.position / 8)
   
   /* the last offset in the index */
@@ -108,6 +110,10 @@ class OffsetIndex(val file: File, val ba
   /**
    * Find the largest offset less than or equal to the given targetOffset 
    * and return a pair holding this offset and it's corresponding physical file position.
+   * 
+   * @param targetOffset The offset to look up.
+   * 
+   * @return The offset found and the corresponding file position for this offset. 
    * If the target offset is smaller than the least entry in the index (or the index is empty),
    * the pair (baseOffset, 0) is returned.
    */
@@ -123,7 +129,11 @@ class OffsetIndex(val file: File, val ba
   /**
    * Find the slot in which the largest offset less than or equal to the given
    * target offset is stored.
-   * Return -1 if the least entry in the index is larger than the target offset or the index is empty
+   * 
+   * @param idx The index buffer
+   * @param targetOffset The offset to look for
+   * 
+   * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
    */
   private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
     // we only store the difference from the baseoffset so calculate that
@@ -161,6 +171,8 @@ class OffsetIndex(val file: File, val ba
   
   /**
    * Get the nth offset mapping from the index
+   * @param n The entry number in the index
+   * @return The offset/position pair at that entry
    */
   def entry(n: Int): OffsetPosition = {
     if(n >= entries)
@@ -170,7 +182,7 @@ class OffsetIndex(val file: File, val ba
   }
   
   /**
-   * Append entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
+   * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
    */
   def append(offset: Long, position: Int) {
     this synchronized {
@@ -192,7 +204,7 @@ class OffsetIndex(val file: File, val ba
   def isFull: Boolean = entries >= this.maxEntries
   
   /**
-   * Truncate the entire index
+   * Truncate the entire index, deleting all entries
    */
   def truncate() = truncateTo(this.baseOffset)
   

Modified: kafka/trunk/core/src/main/scala/kafka/log/package.html
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/package.html?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/package.html (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/package.html Sun Dec  2 20:50:01 2012
@@ -1 +1,6 @@
-The log management system for Kafka.
\ No newline at end of file
+The log management system for Kafka.
+
+The entry point for this system is LogManager. LogManager is responsible for holding all the logs, and handing them out by topic/partition. It also handles the enforcement of the
+flush policy and retention policies.
+
+The Log itself is made up of log segments. A log is a FileMessageSet that contains the data and an OffsetIndex that supports reads by offset on the log.
\ No newline at end of file

Modified: kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala Sun Dec  2 20:50:01 2012
@@ -92,15 +92,23 @@ abstract class MessageSet extends Iterab
   }
   
   /**
-   * Print this message set's contents
+   * Print this message set's contents. If the message set has more than 100 messages, just
+   * print the first 100.
    */
   override def toString: String = {
     val builder = new StringBuilder()
     builder.append(getClass.getSimpleName + "(")
-    for(message <- this) {
+    val iter = this.iterator
+    var i = 0
+    while(iter.hasNext && i < 100) {
+      val message = iter.next
       builder.append(message)
-      builder.append(", ")
+      if(iter.hasNext)
+        builder.append(", ")
+      i += 1
     }
+    if(iter.hasNext)
+      builder.append("...")
     builder.append(")")
     builder.toString
   }

Modified: kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Sun Dec  2 20:50:01 2012
@@ -75,14 +75,14 @@ class ProducerConfig private (val props:
   val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null))
 
   /**
-   * The producer using the zookeeper software load balancer maintains a ZK cache that gets
-   * updated by the zookeeper watcher listeners. During some events like a broker bounce, the
-   * producer ZK cache can get into an inconsistent state, for a small time period. In this time
-   * period, it could end up picking a broker partition that is unavailable. When this happens, the
-   * ZK cache needs to be updated.
-   * This parameter specifies the number of times the producer attempts to refresh this ZK cache.
+   * If a request fails it is possible to have the producer automatically retry. This is controlled by this setting.
+   * Note that not all errors mean that the message was lost--for example if the network connection is lost we will
+   * get a socket exception--in this case enabling retries can result in duplicate messages.
    */
   val producerRetries = props.getInt("producer.num.retries", 3)
 
+  /**
+   * The amount of time to wait in between retries
+   */
   val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
 }

Modified: kafka/trunk/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/KafkaApis.scala Sun Dec  2 20:50:01 2012
@@ -21,6 +21,7 @@ import kafka.admin.{CreateTopicCommand, 
 import kafka.api._
 import kafka.message._
 import kafka.network._
+import kafka.log._
 import kafka.utils.{Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
@@ -59,7 +60,7 @@ class KafkaApis(val requestChannel: Requ
         case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
         case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
-        case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
+        case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
       case e: Throwable =>
@@ -243,12 +244,17 @@ class KafkaApis(val requestChannel: Requ
       try {
         val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
         val log = localReplica.log.get
-        val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
+        val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
+        
+        // update stats
+        BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).messagesInRate.mark(info.count)
+        BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(info.count)
+        
         // we may need to increment high watermark since ISR could be down to 1
         localReplica.partition.maybeIncrementLeaderHW(localReplica)
         trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
-              .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end))
-        ProduceResult(topicAndPartition, start, end)
+              .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
+        ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset)
       } catch {
         case e: KafkaStorageException =>
           fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
@@ -358,11 +364,11 @@ class KafkaApis(val requestChannel: Requ
     else
       replicaManager.getLeaderReplicaIfLocal(topic, partition)
     trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
-    val maxOffsetOpt = if (fromReplicaId == Request.OrdinaryConsumerId) {
-      Some(localReplica.highWatermark)
-    } else {
-      None
-    }
+    val maxOffsetOpt = 
+      if (fromReplicaId == Request.OrdinaryConsumerId)
+        Some(localReplica.highWatermark)
+      else
+        None
     val messages = localReplica.log match {
       case Some(log) =>
         log.read(offset, maxSize, maxOffsetOpt)
@@ -391,15 +397,18 @@ class KafkaApis(val requestChannel: Requ
         else
           replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition)
         val offsets = {
-          val allOffsets = replicaManager.logManager.getOffsets(topicAndPartition,
-                                                                partitionOffsetRequestInfo.time,
-                                                                partitionOffsetRequestInfo.maxNumOffsets)
-          if (!offsetRequest.isFromOrdinaryClient) allOffsets
-          else {
+          val allOffsets = fetchOffsets(replicaManager.logManager,
+                                        topicAndPartition,
+                                        partitionOffsetRequestInfo.time,
+                                        partitionOffsetRequestInfo.maxNumOffsets)
+          if (!offsetRequest.isFromOrdinaryClient) {
+            allOffsets
+          } else {
             val hw = localReplica.highWatermark
             if (allOffsets.exists(_ > hw))
               hw +: allOffsets.dropWhile(_ > hw)
-            else allOffsets
+            else 
+              allOffsets
           }
         }
         (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
@@ -412,6 +421,59 @@ class KafkaApis(val requestChannel: Requ
     val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
+  
+  def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
+    logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match {
+      case Some(log) => 
+        fetchOffsetsBefore(log, timestamp, maxNumOffsets)
+      case None => 
+        if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
+          Seq(0L)
+        else
+          Nil
+    }
+  }
+  
+  def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
+    val segsArray = log.logSegments.toArray
+    var offsetTimeArray: Array[(Long, Long)] = null
+    if(segsArray.last.size > 0)
+      offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
+    else
+      offsetTimeArray = new Array[(Long, Long)](segsArray.length)
+
+    for(i <- 0 until segsArray.length)
+      offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
+    if(segsArray.last.size > 0)
+      offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
+
+    var startIndex = -1
+    timestamp match {
+      case OffsetRequest.LatestTime =>
+        startIndex = offsetTimeArray.length - 1
+      case OffsetRequest.EarliestTime =>
+        startIndex = 0
+      case _ =>
+        var isFound = false
+        debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
+        startIndex = offsetTimeArray.length - 1
+        while (startIndex >= 0 && !isFound) {
+          if (offsetTimeArray(startIndex)._2 <= timestamp)
+            isFound = true
+          else
+            startIndex -=1
+        }
+    }
+
+    val retSize = maxNumOffsets.min(startIndex + 1)
+    val ret = new Array[Long](retSize)
+    for(j <- 0 until retSize) {
+      ret(j) = offsetTimeArray(startIndex)._1
+      startIndex -= 1
+    }
+    // ensure that the returned seq is in descending order of offsets
+    ret.toSeq.sortBy(- _)
+  }
 
   /**
    * Service the topic metadata request API

Modified: kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Sun Dec  2 20:50:01 2012
@@ -38,7 +38,7 @@ class KafkaServer(val config: KafkaConfi
   var logManager: LogManager = null
   var kafkaZookeeper: KafkaZooKeeper = null
   var replicaManager: ReplicaManager = null
-  private var apis: KafkaApis = null
+  var apis: KafkaApis = null
   var kafkaController: KafkaController = null
   val kafkaScheduler = new KafkaScheduler(4)
   var zkClient: ZkClient = null

Modified: kafka/trunk/core/src/main/scala/kafka/server/LeaderElector.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/LeaderElector.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/LeaderElector.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/LeaderElector.scala Sun Dec  2 20:50:01 2012
@@ -27,8 +27,6 @@ trait LeaderElector extends Logging {
 
   def amILeader : Boolean
 
-//  def electAndBecomeLeader: Unit
-//
   def elect: Boolean
 
   def close

Modified: kafka/trunk/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Sun Dec  2 20:50:01 2012
@@ -66,7 +66,7 @@ class ReplicaFetcherThread(name:String,
     )
     val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
     val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
-    replica.log.get.truncateAndStartWithNewOffset(offset)
+    replica.log.get.truncateFullyAndStartAt(offset)
     offset
   }
 

Modified: kafka/trunk/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/tools/KafkaMigrationTool.java?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/tools/KafkaMigrationTool.java (original)
+++ kafka/trunk/core/src/main/scala/kafka/tools/KafkaMigrationTool.java Sun Dec  2 20:50:01 2012
@@ -54,7 +54,7 @@ import java.util.Properties;
  * The user need to provide the configuration file for 07 consumer and 08 producer. For 08 producer,
  * the "serializer.class" filed is set to "kafka.serializer.DefaultEncode" by the code.
  */
-
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class KafkaMigrationTool
 {
   private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());

Modified: kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala Sun Dec  2 20:50:01 2012
@@ -17,27 +17,24 @@
 
 package kafka.utils;
 
+import java.util.Random
 import scala.math._
 
-object Throttler extends Logging {
-  val DefaultCheckIntervalMs = 100L
-}
-
 /**
  * A class to measure and throttle the rate of some process. The throttler takes a desired rate-per-second
  * (the units of the process don't matter, it could be bytes or a count of some other thing), and will sleep for 
- * an appropraite amount of time when maybeThrottle() is called to attain the desired rate.
+ * an appropriate amount of time when maybeThrottle() is called to attain the desired rate.
  * 
  * @param desiredRatePerSec: The rate we want to hit in units/sec
  * @param checkIntervalMs: The interval at which to check our rate
  * @param throttleDown: Does throttling increase or decrease our rate?
  * @param time: The time implementation to use
  */
-@nonthreadsafe
+@threadsafe
 class Throttler(val desiredRatePerSec: Double, 
                 val checkIntervalMs: Long, 
                 val throttleDown: Boolean, 
-                val time: Time) {
+                val time: Time) extends Logging {
   
   private val lock = new Object
   private var periodStartNs: Long = time.nanoseconds
@@ -65,8 +62,7 @@ class Throttler(val desiredRatePerSec: D
           val elapsedMs = elapsedNs / Time.NsPerMs
           val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
           if(sleepTime > 0) {
-            Throttler.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec + 
-                                     ", sleeping for " + sleepTime + " ms to compensate.")
+            println("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
             time.sleep(sleepTime)
           }
         }
@@ -77,3 +73,26 @@ class Throttler(val desiredRatePerSec: D
   }
   
 }
+
+object Throttler {
+  
+  val DefaultCheckIntervalMs = 100L
+  
+  def main(args: Array[String]) {
+    val rand = new Random()
+    val throttler = new Throttler(1000000, 100, true, SystemTime)
+    var start = System.currentTimeMillis
+    var total = 0
+    while(true) {
+      val value = rand.nextInt(1000)
+      throttler.maybeThrottle(value)
+      total += value
+      val now = System.currentTimeMillis
+      if(now - start >= 1000) {
+        println(total)
+        start = now
+        total = 0
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: kafka/trunk/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/admin/AdminTest.scala Sun Dec  2 20:50:01 2012
@@ -401,8 +401,6 @@ class AdminTest extends JUnit3Suite with
     assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
 
     servers.foreach(_.shutdown())
-
-
   }
 
   private def checkIfReassignPartitionPathExists(): Boolean = {



Mime
View raw message