kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject kafka git commit: KAFKA-2024 Log compaction can generate unindexable segments.
Date Sat, 04 Apr 2015 23:00:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a686a67f6 -> 7acfa92c0


KAFKA-2024 Log compaction can generate unindexable segments.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7acfa92c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7acfa92c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7acfa92c

Branch: refs/heads/trunk
Commit: 7acfa92c09bbfeb6d28a0771c09d36c9c89fc31f
Parents: a686a67
Author: Rajini Sivaram <rajinisivaram@gmail.com>
Authored: Sat Apr 4 15:56:17 2015 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Sat Apr 4 15:56:17 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  |  3 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala | 46 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7acfa92c/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 5991428..12eacdf 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -483,7 +483,8 @@ private[log] class Cleaner(val id: Int,
       segs = segs.tail
       while(!segs.isEmpty &&
             logSize + segs.head.size < maxSize &&
-            indexSize + segs.head.index.sizeInBytes < maxIndexSize) {
+            indexSize + segs.head.index.sizeInBytes < maxIndexSize &&
+            segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue)
{
         group = segs.head :: group
         logSize += segs.head.size
         indexSize += segs.head.index.sizeInBytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/7acfa92c/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index a4da95f..c20e423 100644
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -26,6 +26,7 @@ import scala.collection._
 import kafka.common._
 import kafka.utils._
 import kafka.message._
+import java.util.concurrent.atomic.AtomicLong
 
 /**
  * Unit tests for the log cleaning logic
@@ -197,6 +198,51 @@ class CleanerTest extends JUnitSuite {
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size
== groupSize))
   }
   
+  /**
+   * Validate the logic for grouping log segments together for cleaning when only a small
number of
+   * messages are retained, but the range of offsets is greater than Int.MaxValue. A group
should not
+   * contain a range of offsets greater than Int.MaxValue to ensure that relative offsets
can be
+   * stored in 4 bytes.
+   */
+  @Test
+  def testSegmentGroupingWithSparseOffsets() {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val log = makeLog(config = logConfig.copy(segmentSize = 1024, indexInterval = 1))
+       
+    // fill up first segment
+    while (log.numberOfSegments == 1)
+      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
+    
+    // forward offset and append message to next segment at offset Int.MaxValue
+    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(Int.MaxValue-1),
new Message("hello".getBytes, "hello".getBytes))
+    log.append(messageSet, assignOffsets = false)
+    log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
+    assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
+    
+    // grouping should result in a single group with maximum relative offset of Int.MaxValue
+    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize
= Int.MaxValue)
+    assertEquals(1, groups.size)
+    
+    // append another message, making last offset of second segment > Int.MaxValue
+    log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
+    
+    // grouping should not group the two segments to ensure that maximum relative offset
in each group <= Int.MaxValue
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize
= Int.MaxValue)
+    assertEquals(2, groups.size)
+    checkSegmentOrder(groups)
+    
+    // append more messages, creating new segments, further grouping should still occur
+    while (log.numberOfSegments < 4)
+      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
+
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize
= Int.MaxValue)
+    assertEquals(log.numberOfSegments-1, groups.size)
+    for (group <- groups)
+      assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset
- group.head.index.baseOffset <= Int.MaxValue)
+    checkSegmentOrder(groups)
+    
+  }
+  
   private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]) {
     val offsets = groups.flatMap(_.map(_.baseOffset))
     assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)


Mime
View raw message