kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1378590 - in /incubator/kafka/trunk/core/src/main/scala/kafka: log/Log.scala server/KafkaRequestHandlers.scala
Date Wed, 29 Aug 2012 14:52:39 GMT
Author: junrao
Date: Wed Aug 29 14:52:39 2012
New Revision: 1378590

URL: http://svn.apache.org/viewvc?rev=1378590&view=rev
Log:
Message size not checked at the server (patch v3); patched by Swapnil Ghike; reviewed by Jun
Rao; KAFKA-469

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1378590&r1=1378589&r2=1378590&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Wed Aug 29 14:52:39 2012
@@ -255,6 +255,7 @@ private[log] class Log(val dir: File, va
     }
   }
 
+
   /**
    * Read from the log file at the given offset
    */

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1378590&r1=1378589&r2=1378590&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Wed
Aug 29 14:52:39 2012
@@ -22,7 +22,7 @@ import kafka.log._
 import kafka.network._
 import kafka.message._
 import kafka.api._
-import kafka.common.ErrorMapping
+import kafka.common.{MessageSizeTooLargeException, ErrorMapping}
 import java.util.concurrent.atomic.AtomicLong
 import kafka.utils._
 
@@ -73,11 +73,15 @@ private[kafka] class KafkaRequestHandler
       BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes)
     }
     catch {
-      case e =>
-        error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition,
e)
+      case e: MessageSizeTooLargeException =>
+        warn(e.getMessage() + " on " + request.topic + ":" + partition)
+        BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
+        BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
+      case t =>
+        error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition,
t)
         BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
         BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
-        throw e
+        throw t
     }
   }
 



Mime
View raw message