kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joest...@apache.org
Subject svn commit: r1351112 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/Log.scala main/scala/kafka/network/BlockingChannel.scala main/scala/kafka/server/KafkaApis.scala test/scala/unit/kafka/log/LogTest.scala
Date Sun, 17 Jun 2012 15:32:30 GMT
Author: joestein
Date: Sun Jun 17 15:32:29 2012
New Revision: 1351112

URL: http://svn.apache.org/viewvc?rev=1351112&view=rev
Log:
KAFKA-348 rebase branch from trunk patch by Jun Rao reviewed by Joe Stein

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1351112&r1=1351111&r2=1351112&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Sun Jun 17 15:32:29
2012
@@ -21,10 +21,11 @@ import kafka.api.OffsetRequest
 import java.io.{IOException, RandomAccessFile, File}
 import java.util.{Comparator, Collections, ArrayList}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger}
-import kafka.message.{MessageSet, InvalidMessageException, FileMessageSet}
 import kafka.utils._
 import java.text.NumberFormat
-import kafka.common.OffsetOutOfRangeException
+import kafka.server.BrokerTopicStat
+import kafka.common.{InvalidMessageSizeException, OffsetOutOfRangeException}
+import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet}
 
 object Log {
   val FileSuffix = ".kafka"
@@ -214,7 +215,7 @@ private[kafka] class Log(val dir: File, 
    * Append this message set to the active segment of the log, rolling over to a fresh segment
if necessary.
    * Returns the offset at which the messages are written.
    */
-  def append(messages: MessageSet): Unit = {
+  def append(messages: ByteBufferMessageSet): Unit = {
     // validate the messages
     var numberOfMessages = 0
     for(messageAndOffset <- messages) {
@@ -223,13 +224,25 @@ private[kafka] class Log(val dir: File, 
       numberOfMessages += 1;
     }
 
+    BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages)
+    BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
     logStats.recordAppendedMessages(numberOfMessages)
     
+    // truncate the message set's buffer upto validbytes, before appending it to the on-disk
log
+    val validByteBuffer = messages.getBuffer.duplicate()
+    val messageSetValidBytes = messages.validBytes
+    if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
+      throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes
+
+        " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+
+    validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
+    val validMessages = new ByteBufferMessageSet(validByteBuffer)
+
     // they are valid, insert them in the log
     lock synchronized {
       try {
         val segment = segments.view.last
-        segment.messageSet.append(messages)
+        segment.messageSet.append(validMessages)
         maybeFlush(numberOfMessages)
         maybeRoll(segment)
       }
@@ -262,10 +275,17 @@ private[kafka] class Log(val dir: File, 
       val deletable = view.takeWhile(predicate)
       for(seg <- deletable)
         seg.deleted = true
-      val numToDelete = deletable.size
+      var numToDelete = deletable.size
       // if we are deleting everything, create a new empty segment
       if(numToDelete == view.size) {
-        roll()
+        if (view(numToDelete - 1).size > 0)
+          roll()
+        else {
+          // If the last segment to be deleted is empty and we roll the log, the new segment
will have the same
+          // file name. So simply reuse the last segment and reset the modified time.
+          view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds)
+          numToDelete -=1
+        }
       }
       segments.trunc(numToDelete)
     }
@@ -309,9 +329,12 @@ private[kafka] class Log(val dir: File, 
    */
   def roll() {
     lock synchronized {
-      val last = segments.view.last
       val newOffset = nextAppendOffset
       val newFile = new File(dir, nameFromOffset(newOffset))
+      if (newFile.exists) {
+        warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it
first")
+        newFile.delete()
+      }
       debug("Rolling log '" + name + "' to " + newFile.getName())
       segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala?rev=1351112&r1=1351111&r2=1351112&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala Sun
Jun 17 15:32:29 2012
@@ -53,6 +53,7 @@ class BlockingChannel( val host: String,
       channel.configureBlocking(true)
       channel.socket.setSoTimeout(readTimeoutMs)
       channel.socket.setKeepAlive(true)
+      channel.socket.setTcpNoDelay(true)
       channel.connect(new InetSocketAddress(host, port))
 
       writeChannel = channel

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1351112&r1=1351111&r2=1351112&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Sun Jun
17 15:32:29 2012
@@ -126,6 +126,8 @@ class KafkaApis(val requestChannel: Requ
     for(topicData <- request.data) {
       for(partitionData <- topicData.partitionData) {
         msgIndex += 1
+        BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes)
+        BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
         try {
           kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
           val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
@@ -136,6 +138,8 @@ class KafkaApis(val requestChannel: Requ
           trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
         } catch {
           case e =>
+            BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
+            BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
             error("Error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition,
e)
             e match {
               case _: IOException =>
@@ -239,12 +243,16 @@ class KafkaApis(val requestChannel: Requ
       for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_))
) {
         val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
           case Left(err) =>
+            BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
+            BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
             fetchRequest.replicaId match {
               case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
               case _ =>
                 new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
             }
           case Right(messages) =>
+            BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
+            BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
             val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId)
             assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic,
partition) +
               " must exist on leader broker %d".format(logManager.config.brokerId))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1351112&r1=1351111&r2=1351112&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Sun Jun
17 15:32:29 2012
@@ -185,7 +185,7 @@ class LogTest extends JUnitSuite {
       val deletedSegments = log.markDeletedWhile(_ => true)
 
       // we shouldn't delete the last empty log segment.
-      assertTrue("We shouldn't delete the last empty log segment", log.segments.view.size
== 1)
+      assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size ==
0)
 
       // we now have a new log
       assertEquals(curOffset, log.nextAppendOffset)



Mime
View raw message