kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1392988 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: log/LogManager.scala server/HighwaterMarkCheckpoint.scala
Date Tue, 02 Oct 2012 15:47:10 GMT
Author: junrao
Date: Tue Oct  2 15:47:10 2012
New Revision: 1392988

URL: http://svn.apache.org/viewvc?rev=1392988&view=rev
Log:
server should shut down on encountering invalid highwatermark file; patched by Yang Ye; reviewed
by Jun Rao; kafka-509

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1392988&r1=1392987&r2=1392988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Tue Oct  2
15:47:10 2012
@@ -20,10 +20,10 @@ package kafka.log
 import java.io._
 import kafka.utils._
 import scala.collection._
-import kafka.server.KafkaConfig
-import kafka.api.OffsetRequest
 import kafka.log.Log._
 import kafka.common.{TopicAndPartition, KafkaException}
+import kafka.server.{HighwaterMarkCheckpoint, KafkaConfig}
+
 
 /**
  * The guy who creates and hands out logs
@@ -59,7 +59,10 @@ private[kafka] class LogManager(val conf
   val subDirs = logDir.listFiles()
   if(subDirs != null) {
     for(dir <- subDirs) {
-      if(!dir.isDirectory()) {
+      if(dir.getName.equals(HighwaterMarkCheckpoint.highWatermarkFileName)){
+        // skip valid metadata file
+      }
+      else if(!dir.isDirectory()) {
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala?rev=1392988&r1=1392987&r2=1392988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
Tue Oct  2 15:47:10 2012
@@ -45,12 +45,7 @@ class HighwaterMarkCheckpoint(val path: 
     try {
       // write to temp file and then swap with the highwatermark file
       val tempHwFile = new File(hwFile + ".tmp")
-      // it is an error for this file to be present. It could mean that the previous rename
operation failed
-      if(tempHwFile.exists()) {
-        fatal("Temporary high watermark %s file exists. This could mean that the ".format(tempHwFile.getAbsolutePath)
+
-          "previous high watermark checkpoint operation has failed.")
-        System.exit(1)
-      }
+
       val hwFileWriter = new BufferedWriter(new FileWriter(tempHwFile))
       // checkpoint highwatermark for all partitions
       // write the current version
@@ -69,7 +64,6 @@ class HighwaterMarkCheckpoint(val path: 
       hwFileWriter.flush()
       hwFileWriter.close()
       // swap new high watermark file with previous one
-      hwFile.delete()
       if(!tempHwFile.renameTo(hwFile)) {
         fatal("Attempt to swap the new high watermark file with the old one failed")
         System.exit(1)



Mime
View raw message