kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3105: Use `Utils.atomicMoveWithFallback` instead of `File.rename`
Date Mon, 18 Jan 2016 17:47:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 60a5a523b -> 0c32bc992


KAFKA-3105: Use `Utils.atomicMoveWithFallback` instead of `File.rename`

It behaves better on Windows and provides more useful error messages.

Also:
* Minor inconsistency fix in `kafka.server.OffsetCheckpoint`.
* Remove delete from `streams.state.OffsetCheckpoint` constructor (similar to the change in
`kafka.server.OffsetCheckpoint` in https://github.com/apache/kafka/commit/836cb1963330a9e342379899e0fe52b72347736e#diff-2503b32f29cbbd61ed8316f127829455L29).

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #771 from ijuma/kafka-3105-use-atomic-move-with-fallback-instead-of-rename


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c32bc99
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c32bc99
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c32bc99

Branch: refs/heads/trunk
Commit: 0c32bc99265c4645fad2e3244dc2c697bfd9a229
Parents: 60a5a52
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon Jan 18 09:47:32 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Jan 18 09:47:32 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/log/FileMessageSet.scala   | 10 +++++-----
 core/src/main/scala/kafka/log/LogSegment.scala  | 20 +++++++++++++-------
 core/src/main/scala/kafka/log/OffsetIndex.scala | 11 ++++++-----
 .../kafka/server/BrokerMetadataCheckpoint.scala | 10 ++--------
 .../scala/kafka/server/OffsetCheckpoint.scala   |  4 ++--
 .../test/scala/unit/kafka/log/CleanerTest.scala |  7 ++++---
 .../kafka/streams/state/OffsetCheckpoint.java   | 16 +++-------------
 7 files changed, 35 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index b239a6c..d4ce498 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.network.TransportLayer
+import org.apache.kafka.common.utils.Utils
 
 /**
  * An on-disk message set. An optional start and end position can be applied to the message
set
@@ -291,12 +292,11 @@ class FileMessageSet private[kafka](@volatile var file: File,
 
   /**
    * Rename the file that backs this message set
-   * @return true iff the rename was successful
+   * @throws IOException if rename fails.
    */
-  def renameTo(f: File): Boolean = {
-    val success = this.file.renameTo(f)
-    this.file = f
-    success
+  def renameTo(f: File) {
+    try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+    finally this.file = f
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index d604d6c..aa37d52 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -23,7 +23,7 @@ import kafka.server.{LogOffsetMetadata, FetchDataInfo}
 import org.apache.kafka.common.errors.CorruptRecordException
 
 import scala.math._
-import java.io.File
+import java.io.{IOException, File}
 
 
  /**
@@ -256,12 +256,18 @@ class LogSegment(val log: FileMessageSet,
    * Change the suffix for the index and log file for this log segment
    */
   def changeFileSuffixes(oldSuffix: String, newSuffix: String) {
-    val logRenamed = log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix,
newSuffix)))
-    if(!logRenamed)
-      throw new KafkaStorageException("Failed to change the log file suffix from %s to %s
for log segment %d".format(oldSuffix, newSuffix, baseOffset))
-    val indexRenamed = index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath,
oldSuffix, newSuffix)))
-    if(!indexRenamed)
-      throw new KafkaStorageException("Failed to change the index file suffix from %s to
%s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
+
+    def kafkaStorageException(fileType: String, e: IOException) =
+      new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix
to $newSuffix for log segment $baseOffset", e)
+
+    try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
+    catch {
+      case e: IOException => throw kafkaStorageException("log", e)
+    }
+    try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
+    catch {
+      case e: IOException => throw kafkaStorageException("index", e)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 84d18bd..e95c9d1 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -17,6 +17,8 @@
 
 package kafka.log
 
+import org.apache.kafka.common.utils.Utils
+
 import scala.math._
 import java.io._
 import java.nio._
@@ -338,12 +340,11 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val
maxIndexSi
   
   /**
    * Rename the file that backs this offset index
-   * @return true iff the rename was successful
+   * @throws IOException if rename fails
    */
-  def renameTo(f: File): Boolean = {
-    val success = this.file.renameTo(f)
-    this.file = f
-    success
+  def renameTo(f: File) {
+    try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+    finally this.file = f
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 6e8d68d..00e5d0c 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -43,16 +43,10 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
         fileOutputStream.flush()
         fileOutputStream.getFD().sync()
         fileOutputStream.close()
-        // swap new BrokerMetadata file with previous one
-        if(!temp.renameTo(file)) {
-          // renameTo() fails on windows if destination file exists.
-          file.delete()
-          if(!temp.renameTo(file))
-            throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath(),
file.getAbsolutePath()))
-        }
+        Utils.atomicMoveWithFallback(temp.toPath, file.toPath)
       } catch {
         case ie: IOException =>
-          error("Failed to write meta.properties due to ",ie)
+          error("Failed to write meta.properties due to", ie)
           throw ie
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index fe1d823..77f283c 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -71,7 +71,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
   def read(): Map[TopicAndPartition, Long] = {
 
     def malformedLineException(line: String) =
-      throw new IOException(s"Malformed line in offset checkpoint file: $line'")
+      new IOException(s"Malformed line in offset checkpoint file: $line'")
 
     lock synchronized {
       val reader = new BufferedReader(new FileReader(file))
@@ -104,7 +104,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
             throw new IOException("Unrecognized version of the highwatermark checkpoint file:
" + version)
         }
       } catch {
-        case e: NumberFormatException => malformedLineException(line)
+        case e: NumberFormatException => throw malformedLineException(line)
       } finally {
         reader.close()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 8ab9f91..a8092de 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -19,6 +19,7 @@ package kafka.log
 
 import java.io.File
 import java.nio._
+import java.nio.file.Paths
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicLong
 
@@ -376,7 +377,7 @@ class CleanerTest extends JUnitSuite {
     //    On recovery, clean operation is aborted. All messages should be present in the
log
     log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix)
     for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
-      file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix,
"")))
+      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath,
Log.DeletedFileSuffix, "")))
     }
     log = recoverAndCheck(config, allKeys)
     
@@ -388,7 +389,7 @@ class CleanerTest extends JUnitSuite {
     //    renamed to .deleted. Clean operation is resumed during recovery. 
     log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
     for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
-      file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix,
"")))
+      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath,
Log.DeletedFileSuffix, "")))
     }   
     log = recoverAndCheck(config, cleanedKeys)
     
@@ -478,4 +479,4 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
   
   def size: Int = map.size
   
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
index e04de68..d748aac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -55,7 +56,6 @@ public class OffsetCheckpoint {
     private final Object lock;
 
     public OffsetCheckpoint(File file) throws IOException {
-        new File(file + ".tmp").delete(); // try to delete any existing temp files for cleanliness
         this.file = file;
         this.lock = new Object();
     }
@@ -71,26 +71,16 @@ public class OffsetCheckpoint {
                 writeIntLine(writer, VERSION);
                 writeIntLine(writer, offsets.size());
 
-                // write the entries
                 for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
                     writeEntry(writer, entry.getKey(), entry.getValue());
 
-                // flush the buffer and then fsync the underlying file
                 writer.flush();
                 fileOutputStream.getFD().sync();
             } finally {
                 writer.close();
             }
 
-            // swap new offset checkpoint file with previous one
-            if (!temp.renameTo(file)) {
-                // renameTo() fails on Windows if the destination file exists.
-                file.delete();
-                if (!temp.renameTo(file))
-                    throw new IOException(String.format("File rename from %s to %s failed.",
-                        temp.getAbsolutePath(),
-                        file.getAbsolutePath()));
-            }
+            Utils.atomicMoveWithFallback(temp.toPath(), file.toPath());
         }
     }
 
@@ -122,7 +112,7 @@ public class OffsetCheckpoint {
                 switch (version) {
                     case 0:
                         int expectedSize = readInt(reader);
-                        Map<TopicPartition, Long> offsets = new HashMap<TopicPartition,
Long>();
+                        Map<TopicPartition, Long> offsets = new HashMap<>();
                         String line = reader.readLine();
                         while (line != null) {
                             String[] pieces = line.split("\\s+");


Mime
View raw message