kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1393906 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: common/KafkaStorageException.scala log/Log.scala server/KafkaApis.scala
Date Thu, 04 Oct 2012 03:46:45 GMT
Author: junrao
Date: Thu Oct  4 03:46:44 2012
New Revision: 1393906

URL: http://svn.apache.org/viewvc?rev=1393906&view=rev
Log:
log.append() should halt on IOException; patched by Jun Rao; reviewed by Jay Kreps; KAFKA-540

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaStorageException.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaStorageException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaStorageException.scala?rev=1393906&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaStorageException.scala
(added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaStorageException.scala
Thu Oct  4 03:46:44 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * Kafka exception caused by real IOs.
+*/
+class KafkaStorageException(message: String, t: Throwable) extends RuntimeException(message,
t) {
+  def this(message: String) = this(message, null)
+  def this(t: Throwable) = this("", t)
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1393906&r1=1393905&r2=1393906&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu Oct  4 03:46:44
2012
@@ -25,9 +25,9 @@ import kafka.utils._
 import java.text.NumberFormat
 import kafka.server.BrokerTopicStat
 import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet}
-import kafka.common.{KafkaException, InvalidMessageSizeException, OffsetOutOfRangeException}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
+import kafka.common.{KafkaStorageException, KafkaException, InvalidMessageSizeException,
OffsetOutOfRangeException}
 
 object Log {
   val FileSuffix = ".kafka"
@@ -276,8 +276,7 @@ private[kafka] class Log( val dir: File,
       }
       catch {
         case e: IOException =>
-          fatal("Halting due to unrecoverable I/O error while handling producer request",
e)
-          Runtime.getRuntime.halt(1)
+          throw new KafkaStorageException("IO exception in log append", e)
         case e2 => throw e2
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1393906&r1=1393905&r2=1393906&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Oct
 4 03:46:44 2012
@@ -183,14 +183,13 @@ class KafkaApis(val requestChannel: Requ
           BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
           error("Error processing ProducerRequest on %s:%d".format(topic, partitionData.partition),
e)
           e match {
-            case _: IOException =>
-              fatal("Halting due to unrecoverable I/O error while handling producer request:
" + e.getMessage, e)
-              // compiler requires scala.sys.exit (not System.exit).
-              exit(1)
+            case _: KafkaStorageException =>
+              fatal("Halting due to unrecoverable I/O error while handling producer request",
e)
+              Runtime.getRuntime.halt(1)
             case _ =>
-              val (error, offset) = (ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1L)
-              (TopicAndPartition(topic, partitionData.partition), ProducerResponseStatus(error,
offset))
           }
+          val (errorCode, offset) = (ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1L)
+          (TopicAndPartition(topic, partitionData.partition), ProducerResponseStatus(errorCode,
offset))
       }
     }
     )
@@ -369,10 +368,6 @@ class KafkaApis(val requestChannel: Requ
         }
         (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
       } catch {
-        case ioe: IOException =>
-          fatal("Halting due to unrecoverable I/O error while handling offset request: "
+ ioe.getMessage, ioe)
-          // compiler requires scala.sys.exit (not System.exit).
-          exit(1)
         case e =>
           warn("Error while responding to offset request", e)
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
Nil) )



Mime
View raw message