kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1297324 - in /incubator/kafka/trunk/core/src: main/scala/kafka/log/Log.scala test/scala/unit/kafka/log/LogTest.scala
Date Tue, 06 Mar 2012 02:21:54 GMT
Author: junrao
Date: Tue Mar  6 02:21:54 2012
New Revision: 1297324

URL: http://svn.apache.org/viewvc?rev=1297324&view=rev
Log:
broker deletes all file segments when cleaning up an empty log segment; patched by Jun Rao;
reviewed by Neha Narkhede; KAFKA-292

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1297324&r1=1297323&r2=1297324&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Tue Mar  6 02:21:54 2012
@@ -249,10 +249,18 @@ private[log] class Log(val dir: File, va
       val deletable = view.takeWhile(predicate)
       for(seg <- deletable)
         seg.deleted = true
-      val numToDelete = deletable.size
+      var numToDelete = deletable.size
       // if we are deleting everything, create a new empty segment
-      if(numToDelete == view.size)
-        roll()
+      if(numToDelete == view.size) {
+        if (view(numToDelete - 1).size > 0)
+          roll()
+        else {
+          // If the last segment to be deleted is empty and we roll the log, the new segment
will have the same
+          // file name. So simply reuse the last segment and reset the modified time.
+          view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds)
+          numToDelete -=1
+        }
+      }
       segments.trunc(numToDelete)
     }
   }
@@ -290,9 +298,12 @@ private[log] class Log(val dir: File, va
    */
   def roll() {
     lock synchronized {
-      val last = segments.view.last
       val newOffset = nextAppendOffset
       val newFile = new File(dir, Log.nameFromOffset(newOffset))
+      if (newFile.exists) {
+        warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it
first")
+        newFile.delete()
+      }
       debug("Rolling log '" + name + "' to " + newFile.getName())
       segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
     }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1297324&r1=1297323&r2=1297324&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Tue Mar  6 02:21:54
2012
@@ -182,7 +182,10 @@ class LogTest extends JUnitSuite {
       assertEquals(curOffset, log.nextAppendOffset)
 
       // time goes by; the log file (which is empty) is deleted again
-      log.markDeletedWhile(_ => true)
+      val deletedSegments = log.markDeletedWhile(_ => true)
+
+      // we shouldn't delete the last empty log segment.
+      assertTrue(deletedSegments.size == 0)
 
       // we now have a new log
       assertEquals(curOffset, log.nextAppendOffset)



Mime
View raw message