kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1409; oversized messages can slow down the brokers; patched by Guozhang Wang; reviewed by Neha Narkhede, Jun Rao
Date Fri, 25 Apr 2014 18:49:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e69c61327 -> 0bd4c87f9


kafka-1409; oversized messages can slow down the brokers; patched by Guozhang Wang; reviewed
by Neha Narkhede, Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0bd4c87f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0bd4c87f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0bd4c87f

Branch: refs/heads/trunk
Commit: 0bd4c87f9f88fe7f011c7eefc0410132fcf30ff2
Parents: e69c613
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Fri Apr 25 11:49:02 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Apr 25 11:49:02 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 39 ++++++------
 core/src/main/scala/kafka/log/Log.scala         | 62 ++++++++++++++------
 .../main/scala/kafka/message/MessageSet.scala   | 12 +---
 .../src/main/scala/kafka/server/KafkaApis.scala | 27 ++++-----
 .../kafka/server/KafkaRequestHandler.scala      |  2 +-
 5 files changed, 79 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0bd4c87f/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c08eab0..518d2df 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,22 +16,23 @@
  */
 package kafka.cluster
 
-import scala.collection._
+import kafka.common._
 import kafka.admin.AdminUtils
-import kafka.utils._
-import java.lang.Object
+import kafka.utils.{ZkUtils, Pool, Time, Logging}
+import kafka.utils.Utils.inLock
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
 import kafka.server.{OffsetManager, ReplicaManager}
-import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
-import org.apache.log4j.Logger
 import kafka.message.ByteBufferMessageSet
-import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping}
+
 import java.io.IOException
+import java.util.concurrent.locks.ReentrantReadWriteLock
 import scala.Some
-import kafka.common.TopicAndPartition
+import scala.collection._
+
+import com.yammer.metrics.core.Gauge
 
 
 /**
@@ -48,7 +49,7 @@ class Partition(val topic: String,
   var leaderReplicaIdOpt: Option[Int] = None
   var inSyncReplicas: Set[Replica] = Set.empty[Replica]
   private val assignedReplicaMap = new Pool[Int,Replica]
-  private val leaderIsrUpdateLock = new Object
+  private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
   private var zkVersion: Int = LeaderAndIsr.initialZKVersion
   private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
   /* Epoch of the controller that last changed the leader. This needs to be initialized correctly
upon broker startup.
@@ -72,7 +73,7 @@ class Partition(val topic: String,
   )
 
   def isUnderReplicated(): Boolean = {
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.readLock()) {
       leaderReplicaIfLocal() match {
         case Some(_) =>
           inSyncReplicas.size < assignedReplicas.size
@@ -114,7 +115,7 @@ class Partition(val topic: String,
   }
 
   def leaderReplicaIfLocal(): Option[Replica] = {
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.readLock()) {
       leaderReplicaIdOpt match {
         case Some(leaderReplicaId) =>
           if (leaderReplicaId == localBrokerId)
@@ -140,7 +141,7 @@ class Partition(val topic: String,
 
   def delete() {
     // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions
due to log being deleted
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.writeLock()) {
       assignedReplicaMap.clear()
       inSyncReplicas = Set.empty[Replica]
       leaderReplicaIdOpt = None
@@ -155,7 +156,7 @@ class Partition(val topic: String,
   }
 
   def getLeaderEpoch(): Int = {
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.readLock()) {
       return this.leaderEpoch
     }
   }
@@ -167,7 +168,7 @@ class Partition(val topic: String,
   def makeLeader(controllerId: Int,
                  partitionStateInfo: PartitionStateInfo, correlationId: Int,
                  offsetManager: OffsetManager): Boolean = {
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.writeLock()) {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@@ -200,7 +201,7 @@ class Partition(val topic: String,
   def makeFollower(controllerId: Int,
                    partitionStateInfo: PartitionStateInfo,
                    correlationId: Int, offsetManager: OffsetManager): Boolean = {
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.writeLock()) {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@@ -234,7 +235,7 @@ class Partition(val topic: String,
   }
 
   def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.writeLock()) {
       debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId,
offset, topic, partitionId))
       val replicaOpt = getReplica(replicaId)
       if(!replicaOpt.isDefined) {
@@ -270,7 +271,7 @@ class Partition(val topic: String,
   }
 
   def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean,
Short) = {
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.readLock()) {
       leaderReplicaIfLocal() match {
         case Some(_) =>
           val numAcks = inSyncReplicas.count(r => {
@@ -314,7 +315,7 @@ class Partition(val topic: String,
   }
 
   def maybeShrinkIsr(replicaMaxLagTimeMs: Long,  replicaMaxLagMessages: Long) {
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.writeLock()) {
       leaderReplicaIfLocal() match {
         case Some(leaderReplica) =>
           val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs,
replicaMaxLagMessages)
@@ -356,7 +357,7 @@ class Partition(val topic: String,
   }
 
   def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.readLock()) {
       val leaderReplicaOpt = leaderReplicaIfLocal()
       leaderReplicaOpt match {
         case Some(leaderReplica) =>
@@ -402,7 +403,7 @@ class Partition(val topic: String,
   }
 
   override def toString(): String = {
-    leaderIsrUpdateLock synchronized {
+    inLock(leaderIsrUpdateLock.readLock()) {
       val partitionString = new StringBuilder
       partitionString.append("Topic: " + topic)
       partitionString.append("; Partition: " + partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bd4c87f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 46df8d9..f20c232 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -17,15 +17,18 @@
 
 package kafka.log
 
-import java.io.{IOException, File}
-import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
-import java.util.concurrent.atomic._
 import kafka.utils._
-import scala.collection.JavaConversions
-import java.text.NumberFormat
 import kafka.message._
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.BrokerTopicStats
+
+import java.io.{IOException, File}
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic._
+import java.text.NumberFormat
+import scala.collection.JavaConversions
+
 import com.yammer.metrics.core.Gauge
 
 
@@ -235,7 +238,7 @@ class Log(val dir: File,
       return appendInfo
       
     // trim any invalid bytes or partial messages before appending it to the on-disk log
-    var validMessages = trimInvalidBytes(messages)
+    var validMessages = trimInvalidBytes(messages, appendInfo)
 
     try {
       // they are valid, insert them in the log
@@ -246,7 +249,7 @@ class Log(val dir: File,
         val segment = maybeRoll()
 
         if(assignOffsets) {
-          // assign offsets to the messageset
+          // assign offsets to the message set
           val offset = new AtomicLong(nextOffset.get)
           try {
             validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
@@ -260,12 +263,16 @@ class Log(val dir: File,
             throw new IllegalArgumentException("Out of order offsets found in " + messages)
         }
 
-        // Check if the message sizes are valid. This check is done after assigning offsets
to ensure the comparison
-        // happens with the new message size (after re-compression, if any)
+        // re-validate message sizes since after re-compression some may exceed the limit
         for(messageAndOffset <- validMessages.shallowIterator) {
-          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize)
+          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
+            // we record the original message set size instead of trimmed size
+            // to be consistent with pre-compression bytesRejectedRate recording
+            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
+            BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
             throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds
the maximum configured message size of %d."
               .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
+          }
         }
 
         // now append to the log
@@ -287,18 +294,22 @@ class Log(val dir: File,
     }
   }
   
-  /** Struct to hold various quantities we compute about each message set before appending
to the log
+  /**
+   * Struct to hold various quantities we compute about each message set before appending
to the log
    * @param firstOffset The first offset in the message set
    * @param lastOffset The last offset in the message set
+   * @param shallowCount The number of shallow messages
+   * @param validBytes The number of valid bytes
    * @param codec The codec used in the message set
    * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
    */
-  case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec,
shallowCount: Int, offsetsMonotonic: Boolean)
+  case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec,
shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
   
   /**
    * Validate the following:
    * <ol>
    * <li> each message matches its CRC
+   * <li> each message size is valid
    * </ol>
    * 
    * Also compute the following quantities:
@@ -306,12 +317,14 @@ class Log(val dir: File,
    * <li> First offset in the message set
    * <li> Last offset in the message set
    * <li> Number of messages
+   * <li> Number of valid bytes
    * <li> Whether the offsets are monotonically increasing
    * <li> Whether any compression codec is used (if many are used, then the last one
is given)
    * </ol>
    */
   private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo
= {
-    var messageCount = 0
+    var shallowMessageCount = 0
+    var validBytesCount = 0
     var firstOffset, lastOffset = -1L
     var codec: CompressionCodec = NoCompressionCodec
     var monotonic = true
@@ -325,25 +338,38 @@ class Log(val dir: File,
       // update the last offset seen
       lastOffset = messageAndOffset.offset
 
-      // check the validity of the message by checking CRC
       val m = messageAndOffset.message
+
+      // Check if the message sizes are valid.
+      val messageSize = MessageSet.entrySize(m)
+      if(messageSize > config.maxMessageSize) {
+        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
+        BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
+        throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the
maximum configured message size of %d."
+          .format(messageSize, config.maxMessageSize))
+      }
+
+      // check the validity of the message by checking CRC
       m.ensureValid()
-      messageCount += 1;
+
+      shallowMessageCount += 1
+      validBytesCount += messageSize
       
       val messageCodec = m.compressionCodec
       if(messageCodec != NoCompressionCodec)
         codec = messageCodec
     }
-    LogAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic)
+    LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic)
   }
   
   /**
    * Trim any invalid bytes from the end of this message set (if there are any)
    * @param messages The message set to trim
+   * @param info The general information of the message set
    * @return A trimmed message set. This may be the same as what was passed in or it may
not.
    */
-  private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = {
-    val messageSetValidBytes = messages.validBytes
+  private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet
= {
+    val messageSetValidBytes = info.validBytes
     if(messageSetValidBytes < 0)
       throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes
+ " Message set cannot be appended to log. Possible causes are corrupted produce requests")
     if(messageSetValidBytes == messages.sizeInBytes) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bd4c87f/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index a1b5c63..f1b8432 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -80,17 +80,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
    * Gives the total size of this message set in bytes
    */
   def sizeInBytes: Int
-  
-  /**
-   * Validate the checksum of all the messages in the set. Throws an InvalidMessageException
if the checksum doesn't
-   * match the payload for any message.
-   */
-  def validate(): Unit = {
-    for(messageAndOffset <- this)
-      if(!messageAndOffset.message.isValid)
-        throw new InvalidMessageException
-  }
-  
+
   /**
    * Print this message set's contents. If the message set has more than 100 messages, just
    * print the first 100.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bd4c87f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bb0359d..0b668f2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,19 +17,21 @@
 
 package kafka.server
 
-import kafka.admin.AdminUtils
 import kafka.api._
+import kafka.common._
+import kafka.log._
 import kafka.message._
 import kafka.network._
-import kafka.log._
-import scala.collection._
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
+import kafka.admin.AdminUtils
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common._
-import kafka.utils.{Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
 import kafka.controller.KafkaController
+import kafka.utils.{Pool, SystemTime, Logging}
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic._
+import scala.collection._
+
 import org.I0Itec.zkclient.ZkClient
 
 /**
@@ -284,10 +286,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data
     trace("Append [%s] to local log ".format(partitionAndData.toString))
     partitionAndData.map {case (topicAndPartition, messages) =>
-      // update stats for incoming bytes rate
-      BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
-      BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
-
       try {
         val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
         val info =
@@ -297,11 +295,12 @@ class KafkaApis(val requestChannel: RequestChannel,
               .format(topicAndPartition, brokerId))
 
           }
+
         val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L)
0 else (info.lastOffset - info.firstOffset + 1)
 
-        // update stats for successfully appended messages
-        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).logBytesAppendRate.mark(messages.sizeInBytes)
-        BrokerTopicStats.getBrokerAllTopicsStats.logBytesAppendRate.mark(messages.sizeInBytes)
+        // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
+        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
+        BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
         BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
         BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bd4c87f/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index f11f6e2..00bcc06 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -97,7 +97,7 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
   val messagesInRate = newMeter(name + "MessagesInPerSec",  "messages", TimeUnit.SECONDS)
   val bytesInRate = newMeter(name + "BytesInPerSec",  "bytes", TimeUnit.SECONDS)
   val bytesOutRate = newMeter(name + "BytesOutPerSec",  "bytes", TimeUnit.SECONDS)
-  val logBytesAppendRate = newMeter(name + "LogBytesAppendedPerSec",  "bytes", TimeUnit.SECONDS)
+  val bytesRejectedRate = newMeter(name + "BytesRejectedPerSec",  "bytes", TimeUnit.SECONDS)
   val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec",  "requests",
TimeUnit.SECONDS)
   val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec",  "requests",
TimeUnit.SECONDS)
 }


Mime
View raw message