kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1239740 - in /incubator/kafka/trunk/core/src/main/scala/kafka: log/Log.scala server/KafkaRequestHandlers.scala
Date Thu, 02 Feb 2012 17:33:46 GMT
Author: junrao
Date: Thu Feb  2 17:33:46 2012
New Revision: 1239740

URL: http://svn.apache.org/viewvc?rev=1239740&view=rev
Log:
Corrupted request shuts down the broker; patched by Jun Rao; reviewed by Jay Kreps and Neha
Narkhede; KAFKA-261

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1239740&r1=1239739&r2=1239740&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Thu Feb  2 17:33:46 2012
@@ -211,10 +211,18 @@ private[log] class Log(val dir: File, va
     
     // they are valid, insert them in the log
     lock synchronized {
-      val segment = segments.view.last
-      segment.messageSet.append(messages)
-      maybeFlush(numberOfMessages)
-      maybeRoll(segment)
+      try {
+        val segment = segments.view.last
+        segment.messageSet.append(messages)
+        maybeFlush(numberOfMessages)
+        maybeRoll(segment)
+      }
+      catch {
+        case e: IOException =>
+          fatal("Halting due to unrecoverable I/O error while handling producer request",
e)
+          Runtime.getRuntime.halt(1)
+        case e2 => throw e2
+      }
     }
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1239740&r1=1239739&r2=1239740&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Thu
Feb  2 17:33:46 2012
@@ -25,7 +25,6 @@ import kafka.api._
 import kafka.common.ErrorMapping
 import kafka.utils.SystemTime
 import kafka.utils.Logging
-import java.io.IOException
 
 /**
  * Logic to handle the various Kafka requests
@@ -74,15 +73,8 @@ private[kafka] class KafkaRequestHandler
     catch {
       case e =>
         error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition,
e)
-        e match {
-          case _: IOException =>
-            fatal("Halting due to unrecoverable I/O error while handling producer request:
" + e.getMessage, e)
-            Runtime.getRuntime.halt(1)
-          case _ =>
-        }
         throw e
     }
-    None
   }
 
   def handleFetchRequest(request: Receive): Option[Send] = {



Mime
View raw message