kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1399934 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: log/FileMessageSet.scala log/Log.scala log/LogSegment.scala message/ByteBufferMessageSet.scala server/ReplicaFetcherThread.scala tools/DumpLogSegments.scala
Date Fri, 19 Oct 2012 00:09:33 GMT
Author: junrao
Date: Fri Oct 19 00:09:32 2012
New Revision: 1399934

URL: http://svn.apache.org/viewvc?rev=1399934&view=rev
Log:
System Test : Leader Failure Log Segment Checksum Mismatched When request-num-acks is 1; patched
by Jun Rao; reviewed by John Fung, Joel Koshy and Neha Narkhede; KAFKA-573

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala?rev=1399934&r1=1399933&r2=1399934&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala Fri Oct
19 00:09:32 2012
@@ -37,14 +37,17 @@ import kafka.metrics.{KafkaTimer, KafkaM
 class FileMessageSet private[kafka](val file: File,
                                     private[log] val channel: FileChannel,
                                     private[log] val start: Long = 0L,
-                                    private[log] val limit: Long = Long.MaxValue) extends
MessageSet with Logging {
+                                    private[log] val limit: Long = Long.MaxValue,
+                                    initChannelPositionToEnd: Boolean = true) extends MessageSet
with Logging {
   
   /* the size of the message set in bytes */
   private val _size = new AtomicLong(scala.math.min(channel.size(), limit) - start)
-    
-  /* set the file position to the last byte in the file */
-  channel.position(channel.size)
-  
+
+  if (initChannelPositionToEnd) {
+    /* set the file position to the last byte in the file */
+    channel.position(channel.size)
+  }
+
   /**
    * Create a file message set with no limit or offset
    */
@@ -59,7 +62,11 @@ class FileMessageSet private[kafka](val 
    * Return a message set which is a view into this set starting from the given position
and with the given size limit.
    */
   def read(position: Long, size: Long): FileMessageSet = {
-    new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start +
position + size, sizeInBytes()))
+    new FileMessageSet(file,
+                       channel,
+                       this.start + position,
+                       scala.math.min(this.start + position + size, sizeInBytes()),
+                       false)
   }
   
   /**
@@ -74,7 +81,8 @@ class FileMessageSet private[kafka](val 
       buffer.rewind()
       channel.read(buffer, position)
       if(buffer.hasRemaining)
-        throw new IllegalStateException("Failed to read complete buffer.")
+        throw new IllegalStateException("Failed to read complete buffer for targetOffset
%d startPosition %d in %s"
+                                        .format(targetOffset, startingPosition, file.getAbsolutePath))
       buffer.rewind()
       val offset = buffer.getLong()
       if(offset >= targetOffset)
@@ -92,7 +100,7 @@ class FileMessageSet private[kafka](val 
     channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel)
   
   /**
-   * Get an iterator over the messages in the set
+   * Get an iterator over the messages in the set. We only do shallow iteration here.
    */
   override def iterator: Iterator[MessageAndOffset] = {
     new IteratorTemplate[MessageAndOffset] {
@@ -133,10 +141,8 @@ class FileMessageSet private[kafka](val 
   /**
    * Append this message to the message set
    */
-  def append(messages: MessageSet): Unit = {
-    var written = 0L
-    while(written < messages.sizeInBytes)
-      written += messages.writeTo(channel, 0, messages.sizeInBytes)
+  def append(messages: ByteBufferMessageSet) {
+    val written = messages.writeTo(channel, 0, messages.sizeInBytes)
     _size.getAndAdd(written)
   }
  

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1399934&r1=1399933&r2=1399934&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Fri Oct 19 00:09:32
2012
@@ -268,7 +268,8 @@ private[kafka] class Log(val dir: File, 
             }
           
           // now append to the log
-          trace("Appending message set to " + this.name + ": " + validMessages)
+          trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s"
+                .format(this.name, offsets._1, nextOffset.get(), validMessages))
           segment.append(offsets._1, validMessages)
           
           // return the offset at which the messages were appended
@@ -315,7 +316,7 @@ private[kafka] class Log(val dir: File, 
         monotonic = false
       // update the last offset seen
       lastOffset = messageAndOffset.offset
-      
+
       // check the validity of the message by checking CRC and message size
       val m = messageAndOffset.message
       m.ensureValid()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala?rev=1399934&r1=1399933&r2=1399934&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala Fri Oct 19
00:09:32 2012
@@ -2,9 +2,8 @@ package kafka.log
 
 import scala.math._
 import java.io.File
-import kafka.common._
 import kafka.message._
-import kafka.utils.{Utils, Range, Time, SystemTime, nonthreadsafe}
+import kafka.utils._
 
 /**
  * A segment of the log. Each segment has two components: a log and an index. The log is
a FileMessageSet containing
@@ -19,7 +18,7 @@ class LogSegment(val messageSet: FileMes
                  val index: OffsetIndex, 
                  val start: Long, 
                  val indexIntervalBytes: Int,
-                 time: Time) extends Range {
+                 time: Time) extends Range with Logging {
   
   var firstAppendTime: Option[Long] = None
   
@@ -51,6 +50,7 @@ class LogSegment(val messageSet: FileMes
    */
   def append(offset: Long, messages: ByteBufferMessageSet) {
     if (messages.sizeInBytes > 0) {
+      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes,
offset, messageSet.sizeInBytes()))
       // append an entry to the index (if needed)
       if(bytesSinceLastIndexEntry > indexIntervalBytes) {
         index.append(offset, messageSet.sizeInBytes().toInt)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1399934&r1=1399933&r2=1399934&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Fri Oct 19 00:09:32 2012
@@ -125,8 +125,11 @@ class ByteBufferMessageSet(@BeanProperty
   
   /** Write the messages in this set to the given channel */
   def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = {
+    // Ignore offset and size from input. We just want to write the whole buffer to the channel.
     buffer.mark()
-    val written = channel.write(buffer)
+    var written = 0L
+    while(written < sizeInBytes)
+      written += channel.write(buffer)
     buffer.reset()
     written
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1399934&r1=1399933&r2=1399934&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Fri Oct 19 00:09:32 2012
@@ -46,15 +46,15 @@ class ReplicaFetcherThread(name:String,
 
     if (fetchOffset != replica.logEndOffset)
       throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset =
%d.".format(fetchOffset, replica.logEndOffset))
-    trace("Follower %d has replica log end offset %d. Received %d messages and leader hw
%d".format(replica.brokerId,
-      replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw))
+    trace("Follower %d has replica log end offset %d. Received %d messages and leader hw
%d"
+          .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw))
     replica.log.get.append(messageSet, assignOffsets = false)
     trace("Follower %d has replica log end offset %d after appending %d bytes of messages"
-      .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes))
+          .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes))
     val followerHighWatermark = replica.logEndOffset.min(partitionData.hw)
     replica.highWatermark = followerHighWatermark
     trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
-      .format(replica.brokerId, topic, partitionId, followerHighWatermark))
+          .format(replica.brokerId, topic, partitionId, followerHighWatermark))
   }
 
   // handle a partition whose offset is out of range and return a new fetch offset

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala?rev=1399934&r1=1399933&r2=1399934&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala Fri
Oct 19 00:09:32 2012
@@ -47,9 +47,9 @@ object DumpLogSegments {
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
       // since it is a sparse file, in the event of a crash there may be many zero entries,
stop if we see one
-      if(entry.offset <= startOffset)
+      if(entry.offset == 0 && i > 0)
         return
-      println("offset: %d position: %d".format(entry.offset, entry.position))
+      println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position))
     }
   }
   
@@ -61,10 +61,10 @@ object DumpLogSegments {
     var validBytes = 0L
     for(messageAndOffset <- messageSet) {
       val msg = messageAndOffset.message
-      validBytes += MessageSet.entrySize(msg)
-      print("offset: " + messageAndOffset.offset + " isvalid: " + msg.isValid +
-            " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + 
+      print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid:
" + msg.isValid +
+            " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
             " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
+      validBytes += MessageSet.entrySize(msg)
       if(msg.hasKey)
         print(" keysize: " + msg.keySize)
       if(printContents) {



Mime
View raw message