kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3584; Fix synchronization issue between deleteOldSegments() and delete() methods
Date Fri, 13 May 2016 11:57:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fb421dbcf -> 1c4b943f2


KAFKA-3584; Fix synchronization issue between deleteOldSegments() and delete() methods

This PR is to fix synchronization issue between deleteOldSegments() and delete() method calls.
log.deleteOldSegments() call throws NullPointerException after log.delete() method call.

cc ijuma junrao

Author: Manikumar reddy O <manikumar.reddy@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1367 from omkreddy/KAFKA-3584


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c4b943f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c4b943f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c4b943f

Branch: refs/heads/trunk
Commit: 1c4b943f2d9cc90101026519769f142c07bc1785
Parents: fb421db
Author: Manikumar reddy O <manikumar.reddy@gmail.com>
Authored: Fri May 13 12:57:09 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri May 13 12:57:09 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 20 +++++++-------
 .../src/test/scala/unit/kafka/log/LogTest.scala | 28 ++++++++++++++++++++
 2 files changed, 39 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1c4b943f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index e0ad73d..a7549dc 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -563,21 +563,23 @@ class Log(val dir: File,
    * @return The number of segments deleted
    */
   def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
-    // find any segments that match the user-supplied predicate UNLESS it is the final segment
-    // and it is empty (since we would just end up re-creating it
-    val lastSegment = activeSegment
-    val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset !=
lastSegment.baseOffset || s.size > 0))
-    val numToDelete = deletable.size
-    if(numToDelete > 0) {
-      lock synchronized {
+    lock synchronized {
+      //find any segments that match the user-supplied predicate UNLESS it is the final segment
+      //and it is empty (since we would just end up re-creating it)
+      val lastEntry = segments.lastEntry
+      val deletable =
+        if (lastEntry == null) Seq.empty
+        else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset
|| s.size > 0))
+      val numToDelete = deletable.size
+      if (numToDelete > 0) {
         // we must always have at least one segment, so if we are going to delete all the
segments, create a new one first
-        if(segments.size == numToDelete)
+        if (segments.size == numToDelete)
           roll()
         // remove the segments for lookups
         deletable.foreach(deleteSegment(_))
       }
+      numToDelete
     }
-    numToDelete
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c4b943f/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 796f5c3..f48f6b1 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -930,4 +930,32 @@ class LogTest extends JUnitSuite {
   def topicPartitionName(topic: String, partition: String): String =
     File.separator + topic + "-" + partition
 
+  @Test
+  def testDeleteOldSegmentsMethod() {
+    val set = TestUtils.singleMessageSet("test".getBytes)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    val config = LogConfig(logProps)
+    val log = new Log(logDir,
+      config,
+      recoveryPoint = 0L,
+      time.scheduler,
+      time)
+
+    // append some messages to create some segments
+    for (i <- 0 until 100)
+      log.append(set)
+
+    log.deleteOldSegments(_ => true)
+    assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
+
+    // append some messages to create some segments
+    for (i <- 0 until 100)
+      log.append(set)
+
+    log.delete()
+    assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
+    assertEquals("The number of deleted segments shoud be zero.", 0, log.deleteOldSegments(_
=> true))
+  }
 }


Mime
View raw message