kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1395729 [2/4] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/...
Date Mon, 08 Oct 2012 19:13:27 GMT
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import scala.math._
+import java.io._
+import java.nio._
+import java.nio.channels._
+import java.util.concurrent.atomic._
+import kafka.utils._
+
+/**
+ * An index that maps logical offsets to physical file locations for a particular log segment. This index may be sparse:
+ * that is it may not hold an entry for all messages in the log.
+ * 
+ * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries.
+ * 
+ * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant
+ * to locate the offset/location pair for the greatest offset less than or equal to the target offset.
+ * 
+ * Index files can be opened in two ways: either as an empty, mutable index that allows appends or
+ * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an 
+ * immutable one and truncate off any extra bytes. This is done when the index file is rolled over.
+ * 
+ * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
+ * 
+ * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the 
+ * message with that offset. The offset stored is relative to the base offset of the index file. So, for example,
+ * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use
+ * only 4 bytes for the offset.
+ * 
+ * The frequency of entries is up to the user of this class.
+ * 
+ * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
+ * storage format.
+ */
+class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, maxIndexSize: Int = -1) extends Logging {
+
+  /* the memory mapping */
+  private var mmap: MappedByteBuffer = 
+    {
+      val newlyCreated = file.createNewFile()
+      val raf = new RandomAccessFile(file, "rw")
+      try {
+        if(mutable) {
+          /* if mutable create and memory map a new sparse file */
+          if(maxIndexSize < 8)
+            throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
+          
+          /* pre-allocate the file if necessary */
+          if(newlyCreated)
+            raf.setLength(roundToExactMultiple(maxIndexSize, 8))
+          val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, raf.length())
+          
+          /* set the position in the index for the next entry */
+          if(newlyCreated)
+            idx.position(0)
+          else
+            // if this is a pre-existing index, assume it is all valid and set position to last entry
+            idx.position(roundToExactMultiple(idx.limit, 8))
+          idx
+        } else {
+          /* if not mutable, just mmap what they gave us */
+          val len = raf.length()
+          if(len < 0 || len % 8 != 0)
+            throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + 
+                                            " bytes which is not positive or not a multiple of 8.")
+          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len)
+        }
+      } finally {
+        Utils.swallow(raf.close())
+      }
+    }
+  
+  /* the maximum number of entries this index can hold */
+  val maxEntries = mmap.limit / 8
+  
+  /* the number of entries in the index */
+  private var size = if(mutable) new AtomicInteger(mmap.position / 8) else new AtomicInteger(mmap.limit / 8)
+  
+  /* the last offset in the index */
+  var lastOffset = readLastOffset()
+  
+  /**
+   * The last logical offset written to the index
+   */
+  private def readLastOffset(): Long = {
+    val offset = 
+      size.get match {
+        case 0 => 0
+        case s => logical(this.mmap, s-1)
+      }
+    baseOffset + offset
+  }
+
+  /**
+   * Find the largest offset less than or equal to the given targetOffset 
+   * and return a pair holding this logical offset and it's corresponding physical file position.
+   * If the target offset is smaller than the least entry in the index (or the index is empty),
+   * the pair (baseOffset, 0) is returned.
+   */
+  def lookup(targetOffset: Long): OffsetPosition = {
+    if(entries == 0)
+      return OffsetPosition(baseOffset, 0)
+    val idx = mmap.duplicate
+    val slot = indexSlotFor(idx, targetOffset)
+    if(slot == -1)
+      OffsetPosition(baseOffset, 0)
+    else
+      OffsetPosition(baseOffset + logical(idx, slot), physical(idx, slot))
+  }
+  
+  /**
+   * Find the slot in which the largest offset less than or equal to the given
+   * target offset is stored.
+   * Return -1 if the least entry in the index is larger than the target offset 
+   */
+  private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
+    // we only store the difference from the baseoffset so calculate that
+    val relativeOffset = targetOffset - baseOffset
+    
+    // check if the target offset is smaller than the least offset
+    if(logical(idx, 0) > relativeOffset)
+      return -1
+    
+    // binary search for the entry
+    var lo = 0
+    var hi = entries-1
+    while(lo < hi) {
+      val mid = ceil((hi + lo) / 2.0).toInt
+      val found = logical(idx, mid)
+      if(found == relativeOffset)
+        return mid
+      else if(found < relativeOffset)
+        lo = mid
+      else
+        hi = mid - 1
+    }
+    return lo
+  }
+  
+  /* return the nth logical offset relative to the base offset */
+  private def logical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
+  
+  /* return the nth physical offset */
+  private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
+  
+  /**
+   * Get the nth offset mapping from the index
+   */
+  def entry(n: Int): OffsetPosition = {
+    if(n >= entries)
+      throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries))
+    val idx = mmap.duplicate
+    OffsetPosition(logical(idx, n), physical(idx, n))
+  }
+  
+  /**
+   * Append entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
+   */
+  def append(logicalOffset: Long, position: Int) {
+    this synchronized {
+      if(!mutable)
+        throw new IllegalStateException("Attempt to append to an immutable offset index " + file.getName)
+      if(isFull)
+        throw new IllegalStateException("Attempt to append to a full index (size = " + size + ").")
+      if(size.get > 0 && logicalOffset <= lastOffset)
+        throw new IllegalArgumentException("Attempt to append an offset (" + logicalOffset + ") no larger than the last offset appended (" + lastOffset + ").")
+      debug("Adding index entry %d => %d to %s.".format(logicalOffset, position, file.getName))
+      this.mmap.putInt((logicalOffset - baseOffset).toInt)
+      this.mmap.putInt(position)
+      this.size.incrementAndGet()
+      this.lastOffset = logicalOffset
+    }
+  }
+  
+  /**
+   * True iff there are no more slots available in this index
+   */
+  def isFull: Boolean = entries >= this.maxEntries
+  
+  /**
+   * Truncate the entire index
+   */
+  def truncate() = truncateTo(this.baseOffset)
+  
+  /**
+   * Remove all entries from the index which have an offset greater than or equal to the given offset.
+   * Truncating to an offset larger than the largest in the index has no effect.
+   */
+  def truncateTo(offset: Long) {
+    this synchronized {
+      val idx = mmap.duplicate
+      val slot = indexSlotFor(idx, offset)
+
+      /* There are 3 cases for choosing the new size
+       * 1) if there is no entry in the index <= the offset, delete everything
+       * 2) if there is an entry for this exact offset, delete it and everything larger than it
+       * 3) if there is no entry for this offset, delete everything larger than the next smallest
+       */
+      val newEntries = 
+        if(slot < 0)
+          0
+        else if(logical(idx, slot) == offset)
+          slot
+        else
+          slot + 1
+      this.size.set(newEntries)
+      mmap.position(this.size.get * 8)
+      this.lastOffset = readLastOffset
+    }
+  }
+  
+  /**
+   * Make this segment read-only, flush any unsaved changes, and truncate any excess bytes
+   */
+  def makeReadOnly() {
+    this synchronized {
+      mutable = false
+      flush()
+      val raf = new RandomAccessFile(file, "rws")
+      try {
+        val newLength = entries * 8
+        raf.setLength(newLength)
+        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, newLength)
+      } finally {
+        Utils.swallow(raf.close())
+      }
+    }
+  }
+  
+  /**
+   * Flush the data in the index to disk
+   */
+  def flush() {
+    this synchronized {
+      mmap.force()
+    }
+  }
+  
+  /**
+   * Delete this index file
+   */
+  def delete(): Boolean = {
+    this.file.delete()
+  }
+  
+  /** The number of entries in this index */
+  def entries() = size.get
+  
+  /** Close the index */
+  def close() {
+    if(mutable)
+      makeReadOnly()
+  }
+  
+  /**
+   * Round a number to the greatest exact multiple of the given factor less than the given number.
+   * E.g. roundToExactMultiple(67, 8) == 64
+   */
+  private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetPosition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetPosition.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetPosition.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetPosition.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+/**
+ * The mapping between a logical log offset and the physical position
+ * in some log file of the beginning of the message set entry with the
+ * given offset.
+ */
+case class OffsetPosition(val offset: Long, val position: Int)
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala Mon Oct  8 19:13:24 2012
@@ -72,8 +72,8 @@ private[log] class SegmentList[T](seq: S
    * Delete the items from position (newEnd + 1) until end of list
    */
   def truncLast(newEnd: Int): Seq[T] = {
-    if (newEnd < 0 || newEnd > contents.get().length-1)
-      throw new KafkaException("End index must be positive and less than segment list size.");
+    if (newEnd < 0 || newEnd >= contents.get().length)
+      throw new KafkaException("Attempt to truncate segment list of length %d to %d.".format(contents.get().size, newEnd));
     var deleted: Array[T] = null
     val curr = contents.get()
     if (curr.length > 0) {
@@ -95,6 +95,6 @@ private[log] class SegmentList[T](seq: S
   /**
    * Nicer toString method
    */
-  override def toString(): String = view.toString
+  override def toString(): String = "SegmentList(%s)".format(view.mkString(", "))
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Mon Oct  8 19:13:24 2012
@@ -17,12 +17,72 @@
 
 package kafka.message
 
+import scala.reflect.BeanProperty
 import kafka.utils.Logging
 import java.nio.ByteBuffer
 import java.nio.channels._
+import java.io.{InputStream, ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, IOException}
+import java.util.zip._
+import java.util.concurrent.atomic.AtomicLong
 import kafka.utils.IteratorTemplate
 import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException}
 
+object ByteBufferMessageSet {
+  
+  private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = {
+    if(messages.size == 0) {
+      return MessageSet.Empty.buffer
+    } else if(compressionCodec == NoCompressionCodec) {
+      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+      for(message <- messages)
+        writeMessage(buffer, message, offsetCounter.getAndIncrement)
+      buffer.rewind()
+      buffer
+    } else {
+      val byteArrayStream = new ByteArrayOutputStream(MessageSet.messageSetSize(messages))
+      val output = new DataOutputStream(CompressionFactory(compressionCodec, byteArrayStream))
+      var offset = -1L
+      for(message <- messages) {
+        offset = offsetCounter.getAndIncrement
+        output.writeLong(offset)
+        output.writeInt(message.size)
+        output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
+      }
+      output.close()
+      
+      val bytes = byteArrayStream.toByteArray
+      val message = new Message(bytes, compressionCodec)
+      val buffer = ByteBuffer.allocate(message.size + MessageSet.LogOverhead)
+      writeMessage(buffer, message, offset)
+      buffer.rewind()
+      buffer
+    }
+  }
+  
+  def decompress(message: Message): ByteBufferMessageSet = {
+    val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream
+    val inputStream: InputStream = new ByteBufferBackedInputStream(message.payload)
+    val intermediateBuffer = new Array[Byte](1024)
+    val compressed = CompressionFactory(message.compressionCodec, inputStream)
+    Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+      outputStream.write(intermediateBuffer, 0, dataRead)
+    }
+    compressed.close()
+
+    val outputBuffer = ByteBuffer.allocate(outputStream.size)
+    outputBuffer.put(outputStream.toByteArray)
+    outputBuffer.rewind
+    new ByteBufferMessageSet(outputBuffer)
+  }
+    
+  private def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
+    buffer.putLong(offset)
+    buffer.putInt(message.size)
+    buffer.put(message.buffer)
+    message.buffer.rewind()
+  }
+}
+
 /**
  * A sequence of messages stored in a byte buffer
  *
@@ -33,31 +93,34 @@ import kafka.common.{MessageSizeTooLarge
  * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
  * 
  */
-class ByteBufferMessageSet(val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet with Logging {
+class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet with Logging {
   private var shallowValidByteCount = -1L
   if(sizeInBytes > Int.MaxValue)
     throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
-    this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L)
+    this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*))
+  }
+  
+  def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) {
+    this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, messages:_*))
   }
 
   def this(messages: Message*) {
-    this(NoCompressionCodec, messages: _*)
+    this(NoCompressionCodec, new AtomicLong(0), messages: _*)
   }
 
-  def validBytes: Long = shallowValidBytes
-
   private def shallowValidBytes: Long = {
     if(shallowValidByteCount < 0) {
+      var bytes = 0
       val iter = this.internalIterator(true)
       while(iter.hasNext) {
         val messageAndOffset = iter.next
-        shallowValidByteCount = messageAndOffset.offset
+        bytes += MessageSet.entrySize(messageAndOffset.message)
       }
+      this.shallowValidByteCount = bytes
     }
-    if(shallowValidByteCount < initialOffset) 0
-    else (shallowValidByteCount - initialOffset)
+    shallowValidByteCount
   }
   
   /** Write the messages in this set to the given channel */
@@ -74,71 +137,44 @@ class ByteBufferMessageSet(val buffer: B
   /** iterator over compressed messages without decompressing */
   def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true)
 
-  def verifyMessageSize(maxMessageSize: Int){
-    var shallowIter = internalIterator(true)
-    while(shallowIter.hasNext){
-      var messageAndOffset = shallowIter.next
-      val payloadSize = messageAndOffset.message.payloadSize
-      if ( payloadSize > maxMessageSize)
-        throw new MessageSizeTooLargeException("payload size of " + payloadSize + " larger than " + maxMessageSize)
-    }
-  }
-
-  /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
+  /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/
   private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
     new IteratorTemplate[MessageAndOffset] {
       var topIter = buffer.slice()
-      var currValidBytes = initialOffset
-      var innerIter:Iterator[MessageAndOffset] = null
-      var lastMessageSize = 0L
+      var innerIter: Iterator[MessageAndOffset] = null
 
-      def innerDone():Boolean = (innerIter==null || !innerIter.hasNext)
+      def innerDone():Boolean = (innerIter == null || !innerIter.hasNext)
 
       def makeNextOuter: MessageAndOffset = {
-        if (topIter.remaining < 4) {
+        // if there isn't at least an offset and size, we are done
+        if (topIter.remaining < 12)
           return allDone()
-        }
+        val offset = topIter.getLong()
         val size = topIter.getInt()
-        lastMessageSize = size
-
-        trace("Remaining bytes in iterator = " + topIter.remaining)
-        trace("size of data = " + size)
-
-        if(size < 0 || topIter.remaining < size) {
-          if (currValidBytes == initialOffset || size < 0)
-            throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " +
-              topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " +
-              "the fetch size; (2) log corruption )")
+        if(size < 0)
+          throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
+        
+        // we have an incomplete message
+        if(topIter.remaining < size)
           return allDone()
-        }
+          
+        // read the current message and check correctness
         val message = topIter.slice()
         message.limit(size)
         topIter.position(topIter.position + size)
         val newMessage = new Message(message)
-        if(!newMessage.isValid)
-          throw new InvalidMessageException("message is invalid, compression codec: " + newMessage.compressionCodec
-            + " size: " + size + " curr offset: " + currValidBytes + " init offset: " + initialOffset)
 
-        if(isShallow){
-          currValidBytes += 4 + size
-          trace("shallow iterator currValidBytes = " + currValidBytes)
-          new MessageAndOffset(newMessage, currValidBytes)
-        }
-        else{
+        if(isShallow) {
+          new MessageAndOffset(newMessage, offset)
+        } else {
           newMessage.compressionCodec match {
             case NoCompressionCodec =>
-              debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
               innerIter = null
-              currValidBytes += 4 + size
-              trace("currValidBytes = " + currValidBytes)
-              new MessageAndOffset(newMessage, currValidBytes)
+              new MessageAndOffset(newMessage, offset)
             case _ =>
-              debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
-              innerIter = CompressionUtils.decompress(newMessage).internalIterator()
-              if (!innerIter.hasNext) {
-                currValidBytes += 4 + lastMessageSize
+              innerIter = ByteBufferMessageSet.decompress(newMessage).internalIterator()
+              if(!innerIter.hasNext)
                 innerIter = null
-              }
               makeNext()
           }
         }
@@ -147,50 +183,62 @@ class ByteBufferMessageSet(val buffer: B
       override def makeNext(): MessageAndOffset = {
         if(isShallow){
           makeNextOuter
-        }
-        else{
-          val isInnerDone = innerDone()
-          debug("makeNext() in internalIterator: innerDone = " + isInnerDone)
-          isInnerDone match {
-            case true => makeNextOuter
-            case false => {
-              val messageAndOffset = innerIter.next
-              if (!innerIter.hasNext)
-                currValidBytes += 4 + lastMessageSize
-              new MessageAndOffset(messageAndOffset.message, currValidBytes)
-            }
-          }
+        } else {
+          if(innerDone())
+            makeNextOuter
+          else
+            innerIter.next
         }
       }
+      
     }
   }
-
-  def sizeInBytes: Long = buffer.limit
   
-  override def toString: String = {
-    val builder = new StringBuilder()
-    builder.append("ByteBufferMessageSet(")
-    for(message <- this) {
-      builder.append(message)
-      builder.append(", ")
+  /**
+   * Update the offsets for this message set. This method attempts to do an in-place conversion
+   * if there is no compression, but otherwise recopies the messages
+   */
+  private[kafka] def assignOffsets(offsetCounter: AtomicLong, codec: CompressionCodec): ByteBufferMessageSet = {
+    if(codec == NoCompressionCodec) {
+      // do an in-place conversion
+      var position = 0
+      buffer.mark()
+      while(position < sizeInBytes - MessageSet.LogOverhead) {
+        buffer.position(position)
+        buffer.putLong(offsetCounter.getAndIncrement())
+        position += MessageSet.LogOverhead + buffer.getInt()
+      }
+      buffer.reset()
+      this
+    } else {
+      // messages are compressed, crack open the messageset and recompress with correct offset
+      val messages = this.internalIterator(isShallow = false).map(_.message)
+      new ByteBufferMessageSet(compressionCodec = codec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
     }
-    builder.append(")")
-    builder.toString
   }
+ 
+
+  /**
+   * The total number of bytes in this message set, including any partial trailing messages
+   */
+  def sizeInBytes: Long = buffer.limit
+  
+  /**
+   * The total number of bytes in this message set not including any partial, trailing messages
+   */
+  def validBytes: Long = shallowValidBytes
 
+  /**
+   * Two message sets are equal if their respective byte buffers are equal
+   */
   override def equals(other: Any): Boolean = {
     other match {
-      case that: ByteBufferMessageSet =>
-        buffer.equals(that.buffer) && initialOffset == that.initialOffset
+      case that: ByteBufferMessageSet => 
+        buffer.equals(that.buffer)
       case _ => false
     }
   }
 
-  override def hashCode: Int = {
-    var hash = 17
-    hash = hash * 31 + buffer.hashCode
-    hash = hash * 31 + initialOffset.hashCode
-    hash
-  }
+  override def hashCode: Int = buffer.hashCode
 
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionFactory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionFactory.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionFactory.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionFactory.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.io.OutputStream
+import java.io.ByteArrayOutputStream
+import java.util.zip.GZIPOutputStream
+import java.util.zip.GZIPInputStream
+import java.io.IOException
+import java.io.InputStream
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
+import kafka.utils._
+
+object CompressionFactory {
+  
+  def apply(compressionCodec: CompressionCodec, stream: OutputStream): OutputStream = {
+    compressionCodec match {
+      case DefaultCompressionCodec => new GZIPOutputStream(stream)
+      case GZIPCompressionCodec => new GZIPOutputStream(stream)
+      case SnappyCompressionCodec => 
+        import org.xerial.snappy.SnappyOutputStream
+        new SnappyOutputStream(stream)
+      case _ =>
+        throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
+    }
+  }
+  
+  def apply(compressionCodec: CompressionCodec, stream: InputStream): InputStream = {
+    compressionCodec match {
+      case DefaultCompressionCodec => new GZIPInputStream(stream)
+      case GZIPCompressionCodec => new GZIPInputStream(stream)
+      case SnappyCompressionCodec => 
+        import org.xerial.snappy.SnappyInputStream
+        new SnappyInputStream(stream)
+      case _ =>
+        throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
+    }
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala Mon Oct  8 19:13:24 2012
@@ -18,145 +18,214 @@
 package kafka.message
 
 import java.nio._
+import scala.math._
 import kafka.utils._
-import kafka.common.UnknownMagicByteException
 
 /**
- * Message byte offsets
+ * Constants related to messages
  */
 object Message {
-  val MagicVersion: Byte = 1
-  val CurrentMagicValue: Byte = 1
-  val MagicOffset = 0
-  val MagicLength = 1
-  val AttributeOffset = MagicOffset + MagicLength
-  val AttributeLength = 1
+  
   /**
-   * Specifies the mask for the compression code. 2 bits to hold the compression codec.
-   * 0 is reserved to indicate no compression
+   * The current offset and size for all the fixed-length fields
    */
-  val CompressionCodeMask: Int = 0x03  //
-
-
-  val NoCompression:Int = 0
-
+  val CrcOffset = 0
+  val CrcLength = 4
+  val MagicOffset = CrcOffset + CrcLength
+  val MagicLength = 1
+  val AttributesOffset = MagicOffset + MagicLength
+  val AttributesLength = 1
+  val KeySizeOffset = AttributesOffset + AttributesLength
+  val KeySizeLength = 4
+  val KeyOffset = KeySizeOffset + KeySizeLength
+  val MessageOverhead = KeyOffset
+  
   /**
-   * Computes the CRC value based on the magic byte
-   * @param magic Specifies the magic byte value. The only value allowed currently is 1.
+   * The minimum valid size for the message header
    */
-  def crcOffset(magic: Byte): Int = magic match {
-    case MagicVersion => AttributeOffset + AttributeLength
-    case _ => throw new UnknownMagicByteException("Magic byte value of %d is unknown".format(magic))
-  }
+  val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength
   
-  val CrcLength = 4
-
   /**
-   * Computes the offset to the message payload based on the magic byte
-   * @param magic Specifies the magic byte value. The only value allowed currently is 1.
+   * The current "magic" value
    */
-  def payloadOffset(magic: Byte): Int = crcOffset(magic) + CrcLength
+  val CurrentMagicValue: Byte = 2
 
   /**
-   * Computes the size of the message header based on the magic byte
-   * @param magic Specifies the magic byte value. The only value allowed currently is 1.
+   * Specifies the mask for the compression code. 2 bits to hold the compression codec.
+   * 0 is reserved to indicate no compression
    */
-  def headerSize(magic: Byte): Int = payloadOffset(magic)
+  val CompressionCodeMask: Int = 0x03 
 
   /**
-   * Size of the header for magic byte 1. This is the minimum size of any message header.
+   * Compression code for uncompressed messages
    */
-  val MinHeaderSize = headerSize(1);
+  val NoCompression: Int = 0
+
 }
 
 /**
  * A message. The format of an N byte message is the following:
  *
- * 1. 1 byte "magic" identifier to allow format changes, whose value is 1 currently
- *
- * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
- *
- * 3. 4 byte CRC32 of the payload
- *
- * 4. N - 6 byte payload
+ * 1. 4 byte CRC32 of the message
+ * 2. 1 byte "magic" identifier to allow format changes, value is 2 currently
+ * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
+ * 4. 4 byte key length, containing length K
+ * 5. K byte key
+ * 6. (N - K - 10) byte payload
  * 
+ * Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents.
  */
 class Message(val buffer: ByteBuffer) {
   
   import kafka.message.Message._
-
-
-  private def this(checksum: Long, bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = {
-    this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + size))
-    buffer.put(CurrentMagicValue)
+  
+  /**
+   * A constructor to create a Message
+   * @param bytes The payload of the message
+   * @param compressionCodec The compression codec used on the contents of the message (if any)
+   * @param key The key of the message (null, if none)
+   * @param payloadOffset The offset into the payload array used to extract payload
+   * @param payloadSize The size of the payload to use
+   */
+  def this(bytes: Array[Byte], 
+           key: Array[Byte],            
+           codec: CompressionCodec, 
+           payloadOffset: Int, 
+           payloadSize: Int) = {
+    this(ByteBuffer.allocate(Message.CrcLength + 
+                             Message.MagicLength + 
+                             Message.AttributesLength + 
+                             Message.KeySizeLength + 
+                             (if(key == null) 0 else key.length) + 
+                             (if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset)))
+    // skip crc, we will fill that in at the end
+    buffer.put(MagicOffset, CurrentMagicValue)
     var attributes:Byte = 0
-    if (compressionCodec.codec > 0) {
-      attributes =  (attributes | (Message.CompressionCodeMask & compressionCodec.codec)).toByte
+    if (codec.codec > 0)
+      attributes =  (attributes | (CompressionCodeMask & codec.codec)).toByte
+    buffer.put(AttributesOffset, attributes)
+    if(key == null) {
+      buffer.putInt(KeySizeOffset, -1)
+      buffer.position(KeyOffset)
+    } else {
+      buffer.putInt(KeySizeOffset, key.length)
+      buffer.position(KeyOffset)
+      buffer.put(key, 0, key.length)
     }
-    buffer.put(attributes)
-    Utils.putUnsignedInt(buffer, checksum)
-    buffer.put(bytes, offset, size)
+    buffer.put(bytes, payloadOffset, if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset)
     buffer.rewind()
+    
+    // now compute the checksum and fill it in
+    Utils.putUnsignedInt(buffer, CrcOffset, computeChecksum)
+  }
+  
+  def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = 
+    this(bytes = bytes, key = key, codec = codec, payloadOffset = 0, payloadSize = -1)
+  
+  def this(bytes: Array[Byte], codec: CompressionCodec) = 
+    this(bytes = bytes, key = null, codec = codec)
+  
+  def this(bytes: Array[Byte], key: Array[Byte]) = 
+    this(bytes = bytes, key = key, codec = NoCompressionCodec)
+    
+  def this(bytes: Array[Byte]) = 
+    this(bytes = bytes, key = null, codec = NoCompressionCodec)
+    
+  /**
+   * Compute the checksum of the message from the message contents
+   */
+  def computeChecksum(): Long = 
+    Utils.crc32(buffer.array, buffer.arrayOffset + MagicOffset,  buffer.limit - MagicOffset)
+  
+  /**
+   * Retrieve the previously computed CRC for this message
+   */
+  def checksum: Long = Utils.getUnsignedInt(buffer, CrcOffset)
+  
+    /**
+   * Returns true if the crc stored with the message matches the crc computed off the message contents
+   */
+  def isValid: Boolean = checksum == computeChecksum
+  
+  /**
+   * Throw an InvalidMessageException if isValid is false for this message
+   */
+  def ensureValid() {
+    if(!isValid)
+      throw new InvalidMessageException("Message is corrupt (stored crc = " + checksum + ", computed crc = " + computeChecksum() + ")")
   }
-
-  def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, 0, bytes.length, NoCompressionCodec)
-
-  def this(bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = {
-    //Note: we're not crc-ing the attributes header, so we're susceptible to bit-flipping there
-    this(Utils.crc32(bytes, offset, size), bytes, offset, size, compressionCodec)
-  }
-
-  def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = this(bytes, 0, bytes.length, compressionCodec)
-
-  def this(bytes: Array[Byte], offset: Int, size: Int) = this(bytes, offset, size, NoCompressionCodec)
-
-  def this(bytes: Array[Byte]) = this(bytes, 0, bytes.length, NoCompressionCodec)
   
+  /**
+   * The complete serialized size of this message in bytes (including crc, header attributes, etc)
+   */
   def size: Int = buffer.limit
   
-  def payloadSize: Int = size - headerSize(magic)
+  /**
+   * The length of the key in bytes
+   */
+  def keySize: Int = buffer.getInt(Message.KeySizeOffset)
+  
+  /**
+   * Does the message have a key?
+   */
+  def hasKey: Boolean = keySize >= 0
+  
+  /**
+   * The length of the message value in bytes
+   */
+  def payloadSize: Int = size - KeyOffset - max(0, keySize)
   
+  /**
+   * The magic version of this message
+   */
   def magic: Byte = buffer.get(MagicOffset)
   
-  def attributes: Byte = buffer.get(AttributeOffset)
+  /**
+   * The attributes stored with this message
+   */
+  def attributes: Byte = buffer.get(AttributesOffset)
   
-  def compressionCodec:CompressionCodec = {
-    magic match {
-      case 0 => NoCompressionCodec
-      case 1 => CompressionCodec.getCompressionCodec(buffer.get(AttributeOffset) & CompressionCodeMask)
-      case _ => throw new RuntimeException("Invalid magic byte " + magic)
-    }
-
-  }
-
-  def checksum: Long = Utils.getUnsignedInt(buffer, crcOffset(magic))
+  /**
+   * The compression codec used with this message
+   */
+  def compressionCodec: CompressionCodec = 
+    CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask)
   
+  /**
+   * A ByteBuffer containing the content of the message
+   */
   def payload: ByteBuffer = {
     var payload = buffer.duplicate
-    payload.position(headerSize(magic))
+    payload.position(KeyOffset + max(keySize, 0))
     payload = payload.slice()
     payload.limit(payloadSize)
     payload.rewind()
     payload
   }
   
-  def isValid: Boolean =
-    checksum == Utils.crc32(buffer.array, buffer.position + buffer.arrayOffset + payloadOffset(magic), payloadSize)
-
-  def serializedSize: Int = 4 /* int size*/ + buffer.limit
-   
-  def serializeTo(serBuffer:ByteBuffer) = {
-    serBuffer.putInt(buffer.limit)
-    serBuffer.put(buffer.duplicate)
+  /**
+   * A ByteBuffer containing the message key
+   */
+  def key: ByteBuffer = {
+    val s = keySize
+    if(s < 0) {
+      null
+    } else {
+      var key = buffer.duplicate
+      key.position(KeyOffset)
+      key = key.slice()
+      key.limit(s)
+      key.rewind()
+      key
+    }
   }
 
   override def toString(): String = 
-    "message(magic = %d, attributes = %d, crc = %d, payload = %s)".format(magic, attributes, checksum, payload)
+    "Message(magic = %d, attributes = %d, crc = %d, key = %s, payload = %s)".format(magic, attributes, checksum, key, payload)
   
   override def equals(any: Any): Boolean = {
     any match {
-      case that: Message => size == that.size && attributes == that.attributes && checksum == that.checksum &&
-        payload == that.payload && magic == that.magic
+      case that: Message => this.buffer.equals(that.buffer)
       case _ => false
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala Mon Oct  8 19:13:24 2012
@@ -18,5 +18,11 @@
 package kafka.message
 
 
-case class MessageAndOffset(message: Message, offset: Long)
+case class MessageAndOffset(message: Message, offset: Long) {
+  
+  /**
+   * Compute the offset of the next message in the log
+   */
+  def nextOffset: Long = offset + 1
+}
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala Mon Oct  8 19:13:24 2012
@@ -24,9 +24,11 @@ import java.nio.channels._
  * Message set helper functions
  */
 object MessageSet {
-  
-  val LogOverhead = 4
-  val Empty: MessageSet = new ByteBufferMessageSet(ByteBuffer.allocate(0))
+
+  val MessageSizeLength = 4
+  val OffsetLength = 8
+  val LogOverhead = MessageSizeLength + OffsetLength
+  val Empty = new ByteBufferMessageSet(ByteBuffer.allocate(0))
   
   /**
    * The size of a message set containing the given messages
@@ -52,37 +54,15 @@ object MessageSet {
    */
   def entrySize(message: Message): Int = LogOverhead + message.size
 
-  def createByteBuffer(compressionCodec: CompressionCodec, messages: Message*): ByteBuffer =
-    compressionCodec match {
-      case NoCompressionCodec =>
-        val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
-        for (message <- messages) {
-          message.serializeTo(buffer)
-        }
-        buffer.rewind
-        buffer
-      case _ =>
-        messages.size match {
-          case 0 =>
-            val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
-            buffer.rewind
-            buffer
-          case _ =>
-            val message = CompressionUtils.compress(messages, compressionCodec)
-            val buffer = ByteBuffer.allocate(message.serializedSize)
-            message.serializeTo(buffer)
-            buffer.rewind
-            buffer
-        }
-    }
 }
 
 /**
- * A set of messages. A message set has a fixed serialized form, though the container
- * for the bytes could be either in-memory or on disk. A The format of each message is
+ * A set of messages with offsets. A message set has a fixed serialized form, though the container
+ * for the bytes could be either in-memory or on disk. The format of each message is
  * as follows:
+ * 8 byte message offset number
  * 4 byte size containing an integer N
- * N message bytes as described in the message class
+ * N message bytes as described in the Message class
  */
 abstract class MessageSet extends Iterable[MessageAndOffset] {
 
@@ -92,7 +72,7 @@ abstract class MessageSet extends Iterab
   def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Long): Long
   
   /**
-   * Provides an iterator over the messages in this set
+   * Provides an iterator over the message/offset pairs in this set
    */
   def iterator: Iterator[MessageAndOffset]
   
@@ -110,5 +90,19 @@ abstract class MessageSet extends Iterab
       if(!messageAndOffset.message.isValid)
         throw new InvalidMessageException
   }
+  
+  /**
+   * Print this message set's contents
+   */
+  override def toString: String = {
+    val builder = new StringBuilder()
+    builder.append(getClass.getSimpleName + "(")
+    for(message <- this) {
+      builder.append(message)
+      builder.append(", ")
+    }
+    builder.append(")")
+    builder.toString
+  }
 
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala Mon Oct  8 19:13:24 2012
@@ -88,7 +88,8 @@ object ConsoleProducer { 
 
     val props = new Properties()
     props.put("broker.list", brokerList)
-    props.put("compression.codec", DefaultCompressionCodec.codec.toString)
+    val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
+    props.put("compression.codec", codec.toString)
     props.put("producer.type", if(sync) "sync" else "async")
     if(options.has(batchSizeOpt))
       props.put("batch.size", batchSize.toString)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Mon Oct  8 19:13:24 2012
@@ -22,6 +22,7 @@ import kafka.consumer.SimpleConsumer
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
+import kafka.message.MessageAndOffset
 import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
@@ -36,12 +37,13 @@ import java.util.concurrent.TimeUnit
 abstract class  AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
   extends ShutdownableThread(name) {
+
   private val fetchMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
   private val fetchMapLock = new Object
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
   val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
   
-  // callbacks to be defined in subclass
+  /* callbacks to be defined in subclass */
 
   // process fetched data
   def processPartitionData(topic: String, fetchOffset: Long, partitionData: FetchResponsePartitionData)
@@ -100,12 +102,17 @@ abstract class  AbstractFetcherThread(na
             if (currentOffset.isDefined) {
               partitionData.error match {
                 case ErrorMapping.NoError =>
-                  processPartitionData(topic, currentOffset.get, partitionData)
-                  val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
-                  val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+                  val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+                  val validBytes = messages.validBytes
+                  val newOffset = messages.lastOption match {
+                    case Some(m: MessageAndOffset) => m.nextOffset
+                    case None => currentOffset.get
+                  }
                   fetchMap.put(topicAndPartition, newOffset)
                   FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
                   fetcherMetrics.byteRate.mark(validBytes)
+                  // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
+                  processPartitionData(topic, currentOffset.get, partitionData)
                 case ErrorMapping.OffsetOutOfRangeCode =>
                   val newOffset = handleOffsetOutOfRange(topic, partitionId)
                   fetchMap.put(topicAndPartition, newOffset)
@@ -122,7 +129,7 @@ abstract class  AbstractFetcherThread(na
       }
     }
 
-    if (partitionsWithError.size > 0) {
+    if(partitionsWithError.size > 0) {
       debug("handling partitions with error for %s".format(partitionsWithError))
       handlePartitionsWithErrors(partitionsWithError)
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Oct  8 19:13:24 2012
@@ -29,6 +29,7 @@ import mutable.HashMap
 import scala.math._
 import kafka.network.RequestChannel.Response
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic._
 import kafka.metrics.KafkaMetricsGroup
 import org.I0Itec.zkclient.ZkClient
 import kafka.common._
@@ -98,7 +99,7 @@ class KafkaApis(val requestChannel: Requ
    */
   def maybeUnblockDelayedFetchRequests(topic: String, partitionData: ProducerRequestPartitionData) {
     val partition = partitionData.partition
-    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), null)
+    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), partitionData)
     trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
 
     // send any newly unblocked responses
@@ -119,26 +120,30 @@ class KafkaApis(val requestChannel: Requ
       requestLogger.trace("Handling producer request " + request.toString)
     trace("Handling producer request " + request.toString)
 
-    val localProduceResponse = produceToLocalLog(produceRequest)
+    val localProduceResults = appendToLocalLog(produceRequest.data)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
-    val numPartitionsInError = localProduceResponse.status.count(_._2.error != ErrorMapping.NoError)
+    val numPartitionsInError = localProduceResults.count(_.error.isDefined)
     produceRequest.data.foreach(partitionAndData =>
       maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2))
 
     if (produceRequest.requiredAcks == 0 ||
         produceRequest.requiredAcks == 1 ||
         produceRequest.numPartitions <= 0 ||
-        numPartitionsInError == produceRequest.numPartitions)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(localProduceResponse)))
-    else {
+        numPartitionsInError == produceRequest.numPartitions) {
+      val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
+      val response = ProducerResponse(produceRequest.versionId, produceRequest.correlationId, statuses)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    } else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
       val producerRequestKeys = produceRequest.data.keys.map(
         topicAndPartition => new RequestKey(topicAndPartition)).toSeq
-
-      val delayedProduce = new DelayedProduce(
-        producerRequestKeys, request, localProduceResponse,
-        produceRequest, produceRequest.ackTimeoutMs.toLong)
+      val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.end + 1)).toMap
+      val delayedProduce = new DelayedProduce(producerRequestKeys, 
+                                              request,
+                                              statuses,
+                                              produceRequest, 
+                                              produceRequest.ackTimeoutMs.toLong)
       producerRequestPurgatory.watch(delayedProduce)
 
       /*
@@ -155,46 +160,48 @@ class KafkaApis(val requestChannel: Requ
       satisfiedProduceRequests.foreach(_.respond())
     }
   }
+  
+  case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) {
+    def this(key: TopicAndPartition, throwable: Throwable) = 
+      this(key, -1L, -1L, Some(throwable))
+    
+    def errorCode = error match {
+      case None => ErrorMapping.NoError
+      case Some(error) => ErrorMapping.codeFor(error.getClass.asInstanceOf[Class[Throwable]])
+    }
+  }
 
   /**
    * Helper method for handling a parsed producer request
    */
-  private def produceToLocalLog(request: ProducerRequest): ProducerResponse = {
-    trace("Produce [%s] to local log ".format(request.toString))
-
-    val localErrorsAndOffsets = request.data.map (topicAndPartitionData => {
-      val (topic, partitionData) = (topicAndPartitionData._1.topic, topicAndPartitionData._2)
-      BrokerTopicStat.getBrokerTopicStat(topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
+  private def appendToLocalLog(messages: Map[TopicAndPartition, ProducerRequestPartitionData]): Iterable[ProduceResult] = {
+    trace("Append [%s] to local log ".format(messages.toString))
+    messages.map (data => {
+      val (key, partitionData) = data
+      BrokerTopicStat.getBrokerTopicStat(key.topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
       BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes)
-
+ 
       try {
-        val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partitionData.partition)
+        val localReplica = replicaManager.getLeaderReplicaIfLocal(key.topic, key.partition)
         val log = localReplica.log.get
-        log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+        val (start, end) = log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
         // we may need to increment high watermark since ISR could be down to 1
         localReplica.partition.maybeIncrementLeaderHW(localReplica)
-        val responseStatus = ProducerResponseStatus(ErrorMapping.NoError, log.logEndOffset)
-        trace("%d bytes written to logs, nextAppendOffset = %d"
-                      .format(partitionData.messages.sizeInBytes, responseStatus.nextOffset))
-        (TopicAndPartition(topic, partitionData.partition), responseStatus)
+        trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
+              .format(partitionData.messages.sizeInBytes, key.topic, key.partition, start, end))
+        ProduceResult(key, start, end)
       } catch {
-        case e: Throwable =>
-          BrokerTopicStat.getBrokerTopicStat(topic).failedProduceRequestRate.mark()
+        case e: KafkaStorageException =>
+          fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
+          Runtime.getRuntime.halt(1)
+          null
+        case e => 
+          BrokerTopicStat.getBrokerTopicStat(key.topic).failedProduceRequestRate.mark()
           BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
-          error("Error processing ProducerRequest on %s:%d".format(topic, partitionData.partition), e)
-          e match {
-            case _: KafkaStorageException =>
-              fatal("Halting due to unrecoverable I/O error while handling producer request", e)
-              Runtime.getRuntime.halt(1)
-            case _ =>
-          }
-          val (errorCode, offset) = (ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1L)
-          (TopicAndPartition(topic, partitionData.partition), ProducerResponseStatus(errorCode, offset))
-      }
-    }
-    )
-
-    ProducerResponse(request.versionId, request.correlationId, localErrorsAndOffsets)
+          error("Error processing ProducerRequest on %s:%d".format(key.topic, key.partition), e)
+          new ProduceResult(key, e)
+       }
+    })
   }
 
   /**
@@ -207,7 +214,7 @@ class KafkaApis(val requestChannel: Requ
     trace("Handling fetch request " + fetchRequest.toString)
 
     if(fetchRequest.isFromFollower) {
-      maybeUpdatePartitionHW(fetchRequest)
+      maybeUpdatePartitionHw(fetchRequest)
       // after updating HW, some delayed produce requests may be unblocked
       var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
       fetchRequest.requestInfo.foreach {
@@ -220,61 +227,24 @@ class KafkaApis(val requestChannel: Requ
       satisfiedProduceRequests.foreach(_.respond())
     }
 
-    // if there are enough bytes available right now we can answer the request, otherwise we have to punt
-    val availableBytes = availableFetchBytes(fetchRequest)
+    val dataRead = readMessageSets(fetchRequest)
+    val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum
     if(fetchRequest.maxWait <= 0 ||
-       availableBytes >= fetchRequest.minBytes ||
+       bytesReadable >= fetchRequest.minBytes ||
        fetchRequest.numPartitions <= 0) {
-      val topicData = readMessageSets(fetchRequest)
-      debug("Returning fetch response %s for fetch request with correlation id %d".format(
-        topicData.values.map(_.error).mkString(","), fetchRequest.correlationId))
-      val response = FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
+      debug("Returning fetch response %s for fetch request with correlation id %d".format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId))
+      val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, dataRead)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
       debug("Putting fetch request into purgatory")
       // create a list of (topic, partition) pairs to use as keys for this delayed request
       val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_))
-      val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait)
+      val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable)
       fetchRequestPurgatory.watch(delayedFetch)
     }
   }
 
-  /**
-   * Calculate the number of available bytes for the given fetch request
-   */
-  private def availableFetchBytes(fetchRequest: FetchRequest): Long = {
-    val totalBytes = fetchRequest.requestInfo.foldLeft(0L)((folded, curr) => {
-      folded +
-      {
-        val (topic, partition) = (curr._1.topic, curr._1.partition)
-        val (offset, fetchSize) = (curr._2.offset, curr._2.fetchSize)
-        debug("Fetching log for topic %s partition %d".format(topic, partition))
-        try {
-          val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
-          val end = if (!fetchRequest.isFromFollower) {
-            leader.highWatermark
-          } else {
-            leader.logEndOffset
-          }
-          val available = max(0, end - offset)
-          math.min(fetchSize, available)
-        } catch {
-          case e: UnknownTopicOrPartitionException =>
-            info("Invalid partition %d in fetch request from client %s."
-                         .format(partition, fetchRequest.clientId))
-            0
-          case e =>
-            warn("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s"
-                          .format(topic, partition, brokerId, fetchRequest.clientId), e)
-            0
-        }
-      }
-    })
-    trace(totalBytes + " available bytes for fetch request.")
-    totalBytes
-  }
-
-  private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
+  private def maybeUpdatePartitionHw(fetchRequest: FetchRequest) {
     debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest))
     fetchRequest.requestInfo.foreach(info => {
       val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset)
@@ -290,21 +260,20 @@ class KafkaApis(val requestChannel: Requ
     val isFetchFromFollower = fetchRequest.isFromFollower
     fetchRequest.requestInfo.map {
       case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
-        val partitionData = try {
-          val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
-          BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
-          BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
-          if (!isFetchFromFollower) {
-            new FetchResponsePartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
-          } else {
-            debug("Leader %d for topic %s partition %d received fetch request from follower %d"
+        val partitionData = 
+          try {
+            val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
+            BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
+            BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
+            if (!isFetchFromFollower) {
+              new FetchResponsePartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
+            } else {
+              debug("Leader %d for topic %s partition %d received fetch request from follower %d"
                           .format(brokerId, topic, partition, fetchRequest.replicaId))
-            debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
-                          .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
+
             new FetchResponsePartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
           }
-        }
-        catch {
+        } catch {
           case t: Throwable =>
             BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
             BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
@@ -319,19 +288,20 @@ class KafkaApis(val requestChannel: Requ
   /**
    * Read from a single topic/partition at the given offset upto maxSize bytes
    */
-  private def readMessageSet(topic: String, partition: Int, offset: Long,
-                             maxSize: Int, fromFollower: Boolean): (MessageSet, Long) = {
+  private def readMessageSet(topic: String, 
+                             partition: Int, 
+                             offset: Long,
+                             maxSize: Int, 
+                             fromFollower: Boolean): (MessageSet, Long) = {
     // check if the current broker is the leader for the partitions
     val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
     trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
-    val actualSize = if (!fromFollower) {
-      min(leader.highWatermark - offset, maxSize).toInt
-    } else {
-      maxSize
-    }
     val messages = leader.log match {
       case Some(log) =>
-        log.read(offset, actualSize)
+        if(fromFollower)
+          log.read(startOffset = offset, maxLength = maxSize, maxOffset = None)
+        else
+          log.read(startOffset = offset, maxLength = maxSize, maxOffset = Some(leader.highWatermark))
       case None =>
         error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic, partition, brokerId))
         MessageSet.Empty
@@ -449,21 +419,24 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A delayed fetch request
    */
-  class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long)
-    extends DelayedRequest(keys, request, delayMs)
+  class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long)
+    extends DelayedRequest(keys, request, delayMs) {
+    val bytesAccumulated = new AtomicLong(initialSize)
+  }
 
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
-
+  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, ProducerRequestPartitionData](brokerId) {
     this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
      */
-    def checkSatisfied(n: Null, delayedFetch: DelayedFetch): Boolean =
-      availableFetchBytes(delayedFetch.fetch) >= delayedFetch.fetch.minBytes
+    def checkSatisfied(partitionData: ProducerRequestPartitionData, delayedFetch: DelayedFetch): Boolean = {
+      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(partitionData.messages.sizeInBytes)
+      accumulatedSize >= delayedFetch.fetch.minBytes
+    }
 
     /**
      * When a request expires just answer it with whatever data is present
@@ -479,12 +452,11 @@ class KafkaApis(val requestChannel: Requ
 
   class DelayedProduce(keys: Seq[RequestKey],
                        request: RequestChannel.Request,
-                       localProduceResponse: ProducerResponse,
+                       initialErrorsAndOffsets: Map[TopicAndPartition, ProducerResponseStatus],
                        val produce: ProducerRequest,
                        delayMs: Long)
           extends DelayedRequest(keys, request, delayMs) with Logging {
 
-    private val initialErrorsAndOffsets = localProduceResponse.status
     /**
      * Map of (topic, partition) -> partition status
      * The values in this map don't need to be synchronized since updates to the
@@ -498,9 +470,9 @@ class KafkaApis(val requestChannel: Requ
       val (acksPending, error, nextOffset) =
         if (producerResponseStatus.error == ErrorMapping.NoError) {
           // Timeout error state will be cleared when requiredAcks are received
-          (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.nextOffset)
+          (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.offset)
         }
-        else (false, producerResponseStatus.error, producerResponseStatus.nextOffset)
+        else (false, producerResponseStatus.error, producerResponseStatus.offset)
 
       val initialStatus = PartitionStatus(acksPending, error, nextOffset)
       trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus))
@@ -579,8 +551,7 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A holding pen for produce requests waiting to be satisfied.
    */
-  private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) {
-
+  private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey] {
     this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
 
     protected def checkSatisfied(followerFetchRequestKey: RequestKey,

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Mon Oct  8 19:13:24 2012
@@ -33,16 +33,32 @@ class KafkaConfig private (val props: Ve
   }
 
   def verify() = props.verify()
+  
+  /*********** General Configuration ***********/
+  
+  /* the broker id for this server */
+  val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
 
+  /* the maximum size of message that the server can receive */
+  val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
+  
+  /* the number of network threads that the server uses for handling network requests */
+  val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
+
+  /* the number of io threads that the server uses for carrying out network requests */
+  val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
+  
+  /* the number of queued requests allowed before blocking the network threads */
+  val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
+  
+  /*********** Socket Server Configuration ***********/
+  
   /* the port to listen and accept connections on */
   val port: Int = props.getInt("port", 6667)
 
   /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
   val hostName: String = props.getString("hostname", InetAddress.getLocalHost.getHostAddress)
 
-  /* the broker id for this server */
-  val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
-
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
   
@@ -51,18 +67,8 @@ class KafkaConfig private (val props: Ve
   
   /* the maximum number of bytes in a socket request */
   val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
-
-  /* the maximum size of message that the server can receive */
-  val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
   
-  /* the number of network threads that the server uses for handling network requests */
-  val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
-
-  /* the number of io threads that the server uses for carrying out network requests */
-  val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
-  
-  /* the number of queued requests allowed before blocking the network threads */
-  val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
+  /*********** Log Configuration ***********/
 
   /* the default number of log partitions per topic */
   val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
@@ -96,6 +102,12 @@ class KafkaConfig private (val props: Ve
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+  
+  /* the maximum size in bytes of the offset index */
+  val logIndexMaxSizeBytes = props.getIntInRange("log.index.max.size", 10*1024*1024, (4, Int.MaxValue))
+  
+  /* the interval with which we add an entry to the offset index */
+  val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
 
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
   val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
@@ -112,9 +124,7 @@ class KafkaConfig private (val props: Ve
   /* enable auto creation of topic on the server */
   val autoCreateTopics = props.getBoolean("auto.create.topics", true)
 
-  /**
-   * Following properties are relevant to Kafka replication
-   */
+  /*********** Replication configuration ***********/
 
   /* the socket timeout for controller-to-broker channels */
   val controllerSocketTimeoutMs = props.getInt("controller.socket.timeout.ms", 30000)
@@ -122,7 +132,6 @@ class KafkaConfig private (val props: Ve
   /* the buffer size for controller-to-broker-channels */
   val controllerMessageQueueSize= props.getInt("controller.message.queue.size", 10)
 
-
   /* default replication factors for automatically created topics */
   val defaultReplicationFactor = props.getInt("default.replication.factor", 1)
 
@@ -134,25 +143,22 @@ class KafkaConfig private (val props: Ve
 
   val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000)
 
-  /**
-   * Config options relevant to a follower for a replica
-   */
-  /** the socket timeout for network requests */
+  /* the socket timeout for network requests */
   val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
 
-  /** the socket receive buffer for network requests */
+  /* the socket receive buffer for network requests */
   val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
 
-  /** the number of byes of messages to attempt to fetch */
+  /* the number of byes of messages to attempt to fetch */
   val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize)
 
-  /** max wait time for each fetcher request issued by follower replicas*/
+  /* max wait time for each fetcher request issued by follower replicas*/
   val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500)
 
-  /** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
+  /* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
   val replicaMinBytes = props.getInt("replica.fetch.min.bytes", 4096)
 
   /* number of fetcher threads used to replicate messages from a source broker.
-  *  Increasing this value can increase the degree of I/O parallelism in the follower broker. */
+   * Increasing this value can increase the degree of I/O parallelism in the follower broker. */
   val numReplicaFetchers = props.getInt("replica.fetchers", 1)
  }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Mon Oct  8 19:13:24 2012
@@ -114,11 +114,13 @@ class KafkaServer(val config: KafkaConfi
       if(requestHandlerPool != null)
         requestHandlerPool.shutdown()
       kafkaScheduler.shutdown()
-      apis.close()
-      kafkaZookeeper.shutdown()
+      if(apis != null)
+        apis.close()
+      if(kafkaZookeeper != null)
+        kafkaZookeeper.shutdown()
       if(replicaManager != null)
         replicaManager.shutdown()
-      if (socketServer != null)
+      if(socketServer != null)
         socketServer.shutdown()
       if(logManager != null)
         logManager.shutdown()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Mon Oct  8 19:13:24 2012
@@ -36,11 +36,11 @@ class ReplicaFetcherThread(name:String, 
     val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
 
     if (fetchOffset != replica.logEndOffset)
-      throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset))
+      throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset))
     trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d".format(replica.brokerId,
       replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw))
     replica.log.get.append(messageSet)
-    trace("Follower %d has replica log end offset %d after appending %d messages"
+    trace("Follower %d has replica log end offset %d after appending %d bytes of messages"
       .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes))
     val followerHighWatermark = replica.logEndOffset.min(partitionData.hw)
     replica.highWatermark = followerHighWatermark

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala Mon Oct  8 19:13:24 2012
@@ -19,38 +19,64 @@ package kafka.tools
 
 import java.io._
 import kafka.message._
+import kafka.log._
 import kafka.utils._
 
 object DumpLogSegments {
 
   def main(args: Array[String]) {
-    var isNoPrint = false;
-    for(arg <- args)
-      if ("-noprint".compareToIgnoreCase(arg) == 0)
-        isNoPrint = true;
+    val print = args.contains("--print")
+    val files = args.filter(_ != "--print")
 
-    for(arg <- args) {
-      if (! ("-noprint".compareToIgnoreCase(arg) == 0) ) {
-        val file = new File(arg)
+    for(arg <- files) {
+      val file = new File(arg)
+      if(file.getName.endsWith(Log.LogFileSuffix)) {
         println("Dumping " + file)
-        val startOffset = file.getName().split("\\.")(0).toLong
-        var offset = 0L
-        println("Starting offset: " + startOffset)
-        val messageSet = new FileMessageSet(file, false)
-        for(messageAndOffset <- messageSet) {
-          val msg = messageAndOffset.message
-          println("offset: " + (startOffset + offset) + " isvalid: " + msg.isValid +
-                  " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + " compresscodec: " + msg.compressionCodec)
-          if (!isNoPrint)
-            println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
-          offset = messageAndOffset.offset
-        }
-        val endOffset = startOffset + offset
-        println("Tail of the log is at offset: " + endOffset)
-        if (messageSet.sizeInBytes != endOffset)
-          println("Log corrupted from " + endOffset + " to " + messageSet.sizeInBytes + "!!!")
+        dumpLog(file, print)
+      } else if(file.getName.endsWith(Log.IndexFileSuffix)){
+        println("Dumping " + file)
+        dumpIndex(file)
+      }
+    }
+  }
+  
+  /* print out the contents of the index */
+  def dumpIndex(file: File) {
+    val startOffset = file.getName().split("\\.")(0).toLong
+    val index = new OffsetIndex(file = file, baseOffset = startOffset, mutable = false)
+    for(i <- 0 until index.entries) {
+      val entry = index.entry(i)
+      // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
+      if(entry.offset <= startOffset)
+        return
+      println("offset: %d position: %d".format(entry.offset, entry.position))
+    }
+  }
+  
+  /* print out the contents of the log */
+  def dumpLog(file: File, printContents: Boolean) {
+    val startOffset = file.getName().split("\\.")(0).toLong
+    println("Starting offset: " + startOffset)
+    val messageSet = new FileMessageSet(file, false)
+    var validBytes = 0L
+    for(messageAndOffset <- messageSet) {
+      val msg = messageAndOffset.message
+      validBytes += MessageSet.entrySize(msg)
+      print("offset: " + messageAndOffset.offset + " isvalid: " + msg.isValid +
+            " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + 
+            " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
+      if(msg.hasKey)
+        print(" keysize: " + msg.keySize)
+      if(printContents) {
+        if(msg.hasKey)
+          print(" key: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+        print(" payload: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
       }
+      println()
     }
+    val trailingBytes = messageSet.sizeInBytes - validBytes
+    if(trailingBytes > 0)
+      println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName))
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Mon Oct  8 19:13:24 2012
@@ -108,7 +108,7 @@ object SimpleConsumerShell extends Loggi
           for(messageAndOffset <- messageSet) {
             if(printMessages)
               info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
-            offset = messageAndOffset.offset
+            offset = messageAndOffset.nextOffset
             if(printOffsets)
               info("next offset = " + offset)
             consumed += 1

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Mon Oct  8 19:13:24 2012
@@ -295,11 +295,11 @@ object ZkUtils extends Logging {
   def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
     try {
       val stat = client.writeData(path, data, expectVersion)
-      info("Conditional update the zkPath %s with expected version %d succeed and return the new version: %d".format(path, expectVersion, stat.getVersion))
+      debug("Conditional update to the zookeeper path %s with expected version %d succeeded and returned the new version: %d".format(path, expectVersion, stat.getVersion))
       (true, stat.getVersion)
     } catch {
       case e: Exception =>
-        info("Conditional update the zkPath %s with expected version %d failed".format(path, expectVersion))
+        debug("Conditional update to the zookeeper path %s with expected version %d failed".format(path, expectVersion), e)
         (false, -1)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala Mon Oct  8 19:13:24 2012
@@ -33,7 +33,7 @@ object TestLogPerformance {
     val props = TestUtils.createBrokerConfig(0, -1)
     val config = new KafkaConfig(props)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, false, SystemTime)
+    val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)



Mime
View raw message