kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1397190 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/Log.scala main/scala/kafka/server/KafkaApis.scala main/scala/kafka/server/ReplicaFetcherThread.scala test/scala/unit/kafka/log/LogTest.scala
Date Thu, 11 Oct 2012 17:40:00 GMT
Author: jkreps
Date: Thu Oct 11 17:40:00 2012
New Revision: 1397190

URL: http://svn.apache.org/viewvc?rev=1397190&view=rev
Log:
KAFKA-557 Patch to avoid assigning offsets to Log.appends as part of replication. Reviewed
by Neha.


Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1397190&r1=1397189&r2=1397190&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu Oct 11 17:40:00
2012
@@ -227,67 +227,114 @@ private[kafka] class Log(val dir: File, 
 
   /**
    * Append this message set to the active segment of the log, rolling over to a fresh segment
if necessary.
+   * 
+   * This method will generally be responsible for assigning offsets to the messages, 
+   * however if the assignOffsets=false flag is passed we will only check that the existing
offsets are valid.
+   * 
    * Returns a tuple containing (first_offset, last_offset) for the newly appended of the
message set, 
    * or (-1,-1) if the message set is empty
    */
-  def append(messages: ByteBufferMessageSet): (Long, Long) = {
-    // check that all messages are valid and see if we have any compressed messages
-    var messageCount = 0
-    var codec: CompressionCodec = NoCompressionCodec
-    for(messageAndOffset <- messages.shallowIterator) {
-      val m = messageAndOffset.message
-      m.ensureValid()
-      if(MessageSet.entrySize(m) > maxMessageSize)
-        throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the
maximum configured message size of %d.".format(MessageSet.entrySize(m), maxMessageSize))
-      messageCount += 1;
-      val messageCodec = m.compressionCodec
-      if(messageCodec != NoCompressionCodec)
-        codec = messageCodec
-    }
-
+  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long)
= {
+    val messageSetInfo = analyzeAndValidateMessageSet(messages)
+    
     // if we have any valid messages, append them to the log
-    if(messageCount == 0) {
+    if(messageSetInfo.count == 0) {
       (-1L, -1L)
     } else {
-      BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageCount)
-      BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageCount)
+      BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count)
+      BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count)
 
       // trim any invalid bytes or partial messages before appending it to the on-disk log
       var validMessages = trimInvalidBytes(messages)
 
-      // they are valid, insert them in the log
-      lock synchronized {
-        try {
-          val firstOffset = nextOffset.get
-          
-          // maybe roll the log
+      try {
+        // they are valid, insert them in the log
+        val offsets = lock synchronized {
+          // maybe roll the log if this segment is full
           val segment = maybeRoll(segments.view.last)
           
-          // assign offsets to the messages
-          validMessages = validMessages.assignOffsets(nextOffset, codec)
+          // assign offsets to the messageset
+          val offsets = 
+            if(assignOffsets) {
+              val firstOffset = nextOffset.get
+              validMessages = validMessages.assignOffsets(nextOffset, messageSetInfo.codec)
+              val lastOffset = nextOffset.get - 1
+              (firstOffset, lastOffset)
+            } else {
+              if(!messageSetInfo.offsetsMonotonic)
+                throw new IllegalArgumentException("Out of order offsets found in " + messages)
+              nextOffset.set(messageSetInfo.lastOffset + 1)
+              (messageSetInfo.firstOffset, messageSetInfo.lastOffset)
+            }
           
-          trace("Appending message set to " + this.name + ": " + validMessages)
-            
           // now append to the log
-          segment.append(firstOffset, validMessages)
-          val lastOffset = nextOffset.get - 1
-          
-          // maybe flush the log and index
-          maybeFlush(messageCount)
+          trace("Appending message set to " + this.name + ": " + validMessages)
+          segment.append(offsets._1, validMessages)
           
           // return the offset at which the messages were appended
-          (firstOffset, lastOffset)
-        } catch {
-          case e: IOException => throw new KafkaStorageException("I/O exception in append
to log '%s'".format(name), e)
+          offsets
         }
+        
+        // maybe flush the log and index
+        maybeFlush(messageSetInfo.count)
+        
+        // return the first and last offset
+        offsets
+      } catch {
+        case e: IOException => throw new KafkaStorageException("I/O exception in append
to log '%s'".format(name), e)
       }
     }
   }
   
+  /* struct to hold various quantities we compute about each message set before appending
to the log */
+  case class MessageSetAppendInfo(firstOffset: Long, lastOffset: Long, codec: CompressionCodec,
count: Int, offsetsMonotonic: Boolean)
+  
+  /**
+   * Validate the following:
+   * 1. each message is not too large
+   * 2. each message matches its CRC
+   * 
+   * Also compute the following quantities:
+   * 1. First offset in the message set
+   * 2. Last offset in the message set
+   * 3. Number of messages
+   * 4. Whether the offsets are monotonically increasing
+   * 5. Whether any compression codec is used (if many are used, then the last one is given)
+   */
+  private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): MessageSetAppendInfo
= {
+    var messageCount = 0
+    var firstOffset, lastOffset = -1L
+    var codec: CompressionCodec = NoCompressionCodec
+    var monotonic = true
+    for(messageAndOffset <- messages.shallowIterator) {
+      // update the first offset if on the first message
+      if(firstOffset < 0)
+        firstOffset = messageAndOffset.offset
+      // check that offsets are monotonically increasing
+      if(lastOffset >= messageAndOffset.offset)
+        monotonic = false
+      // update the last offset seen
+      lastOffset = messageAndOffset.offset
+      
+      // check the validity of the message by checking CRC and message size
+      val m = messageAndOffset.message
+      m.ensureValid()
+      if(MessageSet.entrySize(m) > maxMessageSize)
+        throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the
maximum configured message size of %d.".format(MessageSet.entrySize(m), maxMessageSize))
+      
+      messageCount += 1;
+      
+      val messageCodec = m.compressionCodec
+      if(messageCodec != NoCompressionCodec)
+        codec = messageCodec
+    }
+    MessageSetAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic)
+  }
+  
   /**
    * Trim any invalid bytes from the end of this message set (if there are any)
    */
-  def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = {
+  private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = {
     val messageSetValidBytes = messages.validBytes
     if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
       throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes
+ " Message set cannot be appended to log. Possible causes are corrupted produce requests")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1397190&r1=1397189&r2=1397190&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Oct
11 17:40:00 2012
@@ -189,7 +189,7 @@ class KafkaApis(val requestChannel: Requ
       try {
         val localReplica = replicaManager.getLeaderReplicaIfLocal(key.topic, key.partition)
         val log = localReplica.log.get
-        val (start, end) = log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+        val (start, end) = log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet],
assignOffsets = true)
         // we may need to increment high watermark since ISR could be down to 1
         localReplica.partition.maybeIncrementLeaderHW(localReplica)
         trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset
%d"

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1397190&r1=1397189&r2=1397190&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Thu Oct 11 17:40:00 2012
@@ -39,7 +39,7 @@ class ReplicaFetcherThread(name:String, 
       throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset =
%d.".format(fetchOffset, replica.logEndOffset))
     trace("Follower %d has replica log end offset %d. Received %d messages and leader hw
%d".format(replica.brokerId,
       replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw))
-    replica.log.get.append(messageSet)
+    replica.log.get.append(messageSet, assignOffsets = false)
     trace("Follower %d has replica log end offset %d after appending %d bytes of messages"
       .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes))
     val followerHighWatermark = replica.logEndOffset.min(partitionData.hw)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1397190&r1=1397189&r2=1397190&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Thu Oct
11 17:40:00 2012
@@ -19,6 +19,7 @@ package kafka.log
 
 import java.io._
 import java.util.ArrayList
+import java.util.concurrent.atomic._
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
@@ -341,6 +342,31 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change offset", 0, log.logEndOffset)
     assertEquals("Should change log size", log.size, 0)
   }
+  
+  @Test
+  def testAppendWithoutOffsetAssignment() {
+    for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
+      logDir.mkdir()
+      var log = new Log(logDir, 
+                        maxLogFileSize = 64*1024, 
+                        maxMessageSize = config.maxMessageSize, 
+                        maxIndexSize = 1000, 
+                        indexIntervalBytes = 10000, 
+                        needsRecovery = true)
+      val messages = List("one", "two", "three", "four", "five", "six")
+      val ms = new ByteBufferMessageSet(compressionCodec = codec, 
+                                        offsetCounter = new AtomicLong(5), 
+                                        messages = messages.map(s => new Message(s.getBytes)):_*)
+      val firstOffset = ms.shallowIterator.toList.head.offset
+      val lastOffset = ms.shallowIterator.toList.last.offset
+      val (first, last) = log.append(ms, assignOffsets = false)
+      assertEquals(last + 1, log.logEndOffset)
+      assertEquals(firstOffset, first)
+      assertEquals(lastOffset, last)
+      assertTrue(log.read(5, 64*1024).size > 0)
+      log.delete()
+    }
+  }
 
   @Test
   def testReopenThenTruncate() {



Mime
View raw message