kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4451; Fix OffsetIndex overflow when replicating a highly compacted topic. OffsetIndex provides checks for overflow, used when deciding to roll a LogSegment
Date Thu, 15 Dec 2016 23:46:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 908b6d114 -> e36e8079d


KAFKA-4451; Fix OffsetIndex overflow when replicating a highly compacted topic. OffsetIndex
provides checks for overflow, used when deciding to roll a LogSegment

Fix OffsetIndex overflow when replicating a highly compacted topic.
https://issues.apache.org/jira/browse/KAFKA-4451

Author: Michael Schiff <schiff.michael@gmail.com>
Author: Michael Schiff <michael.schiff@tubemogul.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #2210 from michaelschiff/bug/4451


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e36e8079
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e36e8079
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e36e8079

Branch: refs/heads/trunk
Commit: e36e8079d8a005550c5281ecebb4b7b622cc41b1
Parents: 908b6d1
Author: Michael Schiff <schiff.michael@gmail.com>
Authored: Thu Dec 15 15:46:26 2016 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Dec 15 15:46:26 2016 -0800

----------------------------------------------------------------------
 .../kafka/common/record/MemoryRecords.java      |  9 ++++-
 core/src/main/scala/kafka/log/Log.scala         | 35 ++++++++++++-----
 core/src/main/scala/kafka/log/LogCleaner.scala  |  8 ++--
 core/src/main/scala/kafka/log/LogSegment.scala  | 13 +++++--
 core/src/main/scala/kafka/log/OffsetIndex.scala |  2 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   | 38 +++++++++----------
 .../src/test/scala/unit/kafka/log/LogTest.scala | 40 ++++++++++++++++++++
 7 files changed, 109 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e36e8079/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index b945062..0301762 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -95,6 +95,7 @@ public class MemoryRecords extends AbstractRecords {
      */
     public FilterResult filterTo(LogEntryFilter filter, ByteBuffer buffer) {
         long maxTimestamp = Record.NO_TIMESTAMP;
+        long maxOffset = -1L;
         long shallowOffsetOfMaxTimestamp = -1L;
         int messagesRead = 0;
         int bytesRead = 0;
@@ -126,6 +127,9 @@ public class MemoryRecords extends AbstractRecords {
                     if (shallowMagic != deepRecord.magic())
                         writeOriginalEntry = false;
 
+                    if (deepEntry.offset() > maxOffset)
+                        maxOffset = deepEntry.offset();
+
                     retainedEntries.add(deepEntry);
                 } else {
                     writeOriginalEntry = false;
@@ -159,7 +163,7 @@ public class MemoryRecords extends AbstractRecords {
             }
         }
 
-        return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained,
maxTimestamp, shallowOffsetOfMaxTimestamp);
+        return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained,
maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
     }
 
     /**
@@ -233,6 +237,7 @@ public class MemoryRecords extends AbstractRecords {
         public final int bytesRead;
         public final int messagesRetained;
         public final int bytesRetained;
+        public final long maxOffset;
         public final long maxTimestamp;
         public final long shallowOffsetOfMaxTimestamp;
 
@@ -240,12 +245,14 @@ public class MemoryRecords extends AbstractRecords {
                             int bytesRead,
                             int messagesRetained,
                             int bytesRetained,
+                            long maxOffset,
                             long maxTimestamp,
                             long shallowOffsetOfMaxTimestamp) {
             this.messagesRead = messagesRead;
             this.bytesRead = bytesRead;
             this.messagesRetained = messagesRetained;
             this.bytesRetained = bytesRetained;
+            this.maxOffset = maxOffset;
             this.maxTimestamp = maxTimestamp;
             this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e36e8079/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d58a066..7a54b77 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -411,11 +411,17 @@ class Log(@volatile var dir: File,
         }
 
         // maybe roll the log if this segment is full
-        val segment = maybeRoll(messagesSize = validRecords.sizeInBytes, maxTimestampInMessages
= appendInfo.maxTimestamp)
+        val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
+          maxTimestampInMessages = appendInfo.maxTimestamp,
+          maxOffsetInMessages = appendInfo.lastOffset)
+
 
         // now append to the log
-        segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp,
-          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords)
+        segment.append(firstOffset = appendInfo.firstOffset,
+          largestOffset = appendInfo.lastOffset,
+          largestTimestamp = appendInfo.maxTimestamp,
+          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
+          records = validRecords)
 
         // increment the log end offset
         updateLogEndOffset(appendInfo.lastOffset + 1)
@@ -453,7 +459,8 @@ class Log(@volatile var dir: File,
   private def analyzeAndValidateRecords(records: MemoryRecords): LogAppendInfo = {
     var shallowMessageCount = 0
     var validBytesCount = 0
-    var firstOffset, lastOffset = -1L
+    var firstOffset = -1L
+    var lastOffset = -1L
     var sourceCodec: CompressionCodec = NoCompressionCodec
     var monotonic = true
     var maxTimestamp = Record.NO_TIMESTAMP
@@ -739,18 +746,28 @@ class Log(@volatile var dir: File,
    * </ol>
    * @return The currently active segment after (perhaps) rolling to a new segment
    */
-  private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long): LogSegment = {
+  private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages:
Long): LogSegment = {
     val segment = activeSegment
     val now = time.milliseconds
     val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs
- segment.rollJitterMs
     if (segment.size > config.segmentSize - messagesSize ||
         (segment.size > 0 && reachedRollMs) ||
-        segment.index.isFull || segment.timeIndex.isFull) {
+        segment.index.isFull || segment.timeIndex.isFull || !segment.canConvertToRelativeOffset(maxOffsetInMessages))
{
       debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}},
" +
           s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " +
           s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries},
" +
           s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs
- segment.rollJitterMs}).")
-      roll()
+      /*
+        maxOffsetInMessages - Integer.MAX_VALUE is a heuristic value for the first offset
in the set of messages.
+        Since the offset in messages will not differ by more than Integer.MAX_VALUE, this
is guaranteed <= the real
+        first offset in the set. Determining the true first offset in the set requires decompression,
which the follower
+        is trying to avoid during log append. Prior behavior assigned new baseOffset = logEndOffset
from old segment.
+        This was problematic in the case that two consecutive messages differed in offset
by
+        Integer.MAX_VALUE.toLong + 2 or more.  In this case, the prior behavior would roll
a new log segment whose
+        base offset was too low to contain the next message.  This edge case is possible
when a replica is recovering a
+        highly compacted topic from scratch.
+       */
+      roll(maxOffsetInMessages - Integer.MAX_VALUE)
     } else {
       segment
     }
@@ -762,10 +779,10 @@ class Log(@volatile var dir: File,
    *
    * @return The newly rolled segment
    */
-  def roll(): LogSegment = {
+  def roll(expectedNextOffset: Long = 0): LogSegment = {
     val start = time.nanoseconds
     lock synchronized {
-      val newOffset = logEndOffset
+      val newOffset = Math.max(expectedNextOffset, logEndOffset)
       val logFile = logFilename(dir, newOffset)
       val indexFile = indexFilename(dir, newOffset)
       val timeIndexFile = timeIndexFilename(dir, newOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e36e8079/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7ddb2c4..7676c88 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -476,10 +476,12 @@ private[log] class Cleaner(val id: Int,
       // if any messages are to be retained, write them out
       if (writeBuffer.position > 0) {
         writeBuffer.flip()
-
         val retained = MemoryRecords.readableRecords(writeBuffer)
-        dest.append(firstOffset = retained.deepIterator().next().offset, largestTimestamp
= result.maxTimestamp,
-          shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp, records = retained)
+        dest.append(firstOffset = retained.deepIterator().next().offset,
+          largestOffset = result.maxOffset,
+          largestTimestamp = result.maxTimestamp,
+          shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
+          records = retained)
         throttler.maybeThrottle(writeBuffer.limit)
       }
       

http://git-wip-us.apache.org/repos/asf/kafka/blob/e36e8079/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index bd4eb68..c08a0bc 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -79,7 +79,14 @@ class LogSegment(val log: FileRecords,
   /* Return the size in bytes of this log segment */
   def size: Long = log.sizeInBytes()
 
-  /**
+   /**
+     * checks that the argument offset can be represented as an integer offset relative to
the baseOffset.
+     */
+   def canConvertToRelativeOffset(offset: Long): Boolean = {
+     (offset - baseOffset) <= Integer.MAX_VALUE
+   }
+
+   /**
    * Append the given messages starting with the given offset. Add
    * an entry to the index if needed.
    *
@@ -91,7 +98,7 @@ class LogSegment(val log: FileRecords,
    * @param records The log entries to append.
    */
   @nonthreadsafe
-  def append(firstOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords) {
+  def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp:
Long, records: MemoryRecords) {
     if (records.sizeInBytes > 0) {
       trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at
shallow offset %d"
           .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp,
shallowOffsetOfMaxTimestamp))
@@ -99,9 +106,9 @@ class LogSegment(val log: FileRecords,
       if (physicalPosition == 0)
         rollingBasedTimestamp = Some(largestTimestamp)
       // append the messages
+      require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can
not be safely converted to relative offset.")
       val appendedBytes = log.append(records)
       trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
-
       // Update the in memory max timestamp and corresponding offset.
       if (largestTimestamp > maxTimestampSoFar) {
         maxTimestampSoFar = largestTimestamp

http://git-wip-us.apache.org/repos/asf/kafka/blob/e36e8079/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index ad1b196..a59c02c 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -114,7 +114,7 @@ class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1)
       OffsetPosition(relativeOffset(idx, n), physical(idx, n))
     }
   }
-  
+
   /**
    * Append an entry for the given offset/location pair to the index. This entry must have
a larger offset than all subsequent entries.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/e36e8079/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index d99981a..1c747ec 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -75,7 +75,7 @@ class LogSegmentTest {
   def testReadBeforeFirstOffset() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there", "little", "bee")
-    seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(50, 53, Record.NO_TIMESTAMP, -1L, ms)
     val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).records
     assertEquals(ms.deepIterator.asScala.toList, read.deepIterator.asScala.toList)
   }
@@ -89,7 +89,7 @@ class LogSegmentTest {
     val baseOffset = 50
     val seg = createSegment(baseOffset)
     val ms = records(baseOffset, "hello", "there", "beautiful")
-    seg.append(baseOffset, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(baseOffset, 52, Record.NO_TIMESTAMP, -1L, ms)
     def validate(offset: Long) =
       assertEquals(ms.deepIterator.asScala.filter(_.offset == offset).toList,
                    seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.deepIterator.asScala.toList)
@@ -105,7 +105,7 @@ class LogSegmentTest {
   def testReadAfterLast() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there")
-    seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
     val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
     assertNull("Read beyond the last offset in the segment should give null", read)
   }
@@ -118,9 +118,9 @@ class LogSegmentTest {
   def testReadFromGap() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there")
-    seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, Record.NO_TIMESTAMP, -1L, ms2)
+    seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
   }
@@ -135,9 +135,9 @@ class LogSegmentTest {
     var offset = 40
     for (_ <- 0 until 30) {
       val ms1 = records(offset, "hello")
-      seg.append(offset, Record.NO_TIMESTAMP, -1L, ms1)
+      seg.append(offset, offset, Record.NO_TIMESTAMP, -1L, ms1)
       val ms2 = records(offset + 1, "hello")
-      seg.append(offset + 1, Record.NO_TIMESTAMP, -1L, ms2)
+      seg.append(offset + 1, offset + 1, Record.NO_TIMESTAMP, -1L, ms2)
       // check that we can read back both messages
       val read = seg.read(offset, None, 10000)
       assertEquals(List(ms1.deepIterator.next(), ms2.deepIterator.next()), read.records.deepIterator.asScala.toList)
@@ -156,7 +156,7 @@ class LogSegmentTest {
     val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1)
     var offset = 40
     for (_ <- 0 until numMessages) {
-      seg.append(offset, offset, offset, records(offset, "hello"))
+      seg.append(offset, offset, offset, offset, records(offset, "hello"))
       offset += 1
     }
     val expectedNumEntries = numMessages / 2 - 1
@@ -175,10 +175,10 @@ class LogSegmentTest {
   def testTruncateFull() {
     // test the case where we fully truncate the log
     val seg = createSegment(40)
-    seg.append(40, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
+    seg.append(40, 41, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
     seg.truncateTo(0)
     assertNull("Segment should be empty.", seg.read(0, None, 1024))
-    seg.append(40, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
+    seg.append(40, 41, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
   }
 
   /**
@@ -190,7 +190,7 @@ class LogSegmentTest {
     val seg = createSegment(40, messageSize * 2 - 1)
     // Produce some messages
     for (i <- 40 until 50)
-      seg.append(i, i * 10, i, records(i, s"msg$i"))
+      seg.append(i, i, i * 10, i, records(i, s"msg$i"))
 
     assertEquals(490, seg.largestTimestamp)
     // Search for an indexed timestamp
@@ -214,7 +214,7 @@ class LogSegmentTest {
   def testNextOffsetCalculation() {
     val seg = createSegment(40)
     assertEquals(40, seg.nextOffset)
-    seg.append(50, Record.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
+    seg.append(50, 52, Record.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
     assertEquals(53, seg.nextOffset())
   }
 
@@ -241,7 +241,7 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
+      seg.append(i, i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
     val indexFile = seg.index.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(64*1024)
@@ -257,7 +257,7 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptTimeIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, i * 10, i, records(i, i.toString))
+      seg.append(i, i, i * 10, i, records(i, i.toString))
     val timeIndexFile = seg.timeIndex.file
     TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
     seg.recover(64*1024)
@@ -277,7 +277,7 @@ class LogSegmentTest {
     for (_ <- 0 until 10) {
       val seg = createSegment(0)
       for(i <- 0 until messagesAppended)
-        seg.append(i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
+        seg.append(i, i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
       val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the way to the
end
 
@@ -303,9 +303,9 @@ class LogSegmentTest {
   def testCreateWithInitFileSizeAppendMessage() {
     val seg = createSegment(40, false, 512*1024*1024, true)
     val ms = records(50, "hello", "there")
-    seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, Record.NO_TIMESTAMP, -1L, ms2)
+    seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
   }
@@ -317,9 +317,9 @@ class LogSegmentTest {
     val seg = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, false, 512*1024*1024,
true)
 
     val ms = records(50, "hello", "there")
-    seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, Record.NO_TIMESTAMP, -1L, ms2)
+    seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
     val oldSize = seg.log.sizeInBytes()

http://git-wip-us.apache.org/repos/asf/kafka/blob/e36e8079/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index fcf9c89..49381a4 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1015,6 +1015,46 @@ class LogTest extends JUnitSuite {
   }
 
   @Test
+  def testOverCompactedLogRecovery(): Unit = {
+    // append some messages to create some segments
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+    val config = LogConfig(logProps)
+    val log = new Log(logDir,
+      config,
+      recoveryPoint = 0L,
+      time.scheduler,
+      time)
+    val set1 = MemoryRecords.withRecords(0, Record.create("v1".getBytes(), "k1".getBytes()))
+    val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, Record.create("v3".getBytes(),
"k3".getBytes()))
+    val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, Record.create("v4".getBytes(),
"k4".getBytes()))
+    val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, Record.create("v5".getBytes(),
"k5".getBytes()))
+    //Writes into an empty log with baseOffset 0
+    log.append(set1, false)
+    assertEquals(0L, log.activeSegment.baseOffset)
+    //This write will roll the segment, yielding a new segment with base offset = max(2,
1) = 2
+    log.append(set2, false)
+    assertEquals(2L, log.activeSegment.baseOffset)
+    //This will also roll the segment, yielding a new segment with base offset = max(3, Integer.MAX_VALUE+3)
= Integer.MAX_VALUE+3
+    log.append(set3, false)
+    assertEquals(Integer.MAX_VALUE.toLong + 3, log.activeSegment.baseOffset)
+    //This will go into the existing log
+    log.append(set4, false)
+    assertEquals(Integer.MAX_VALUE.toLong + 3, log.activeSegment.baseOffset)
+    log.close()
+    val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index"))
+    assertEquals(3, indexFiles.length)
+    for (file <- indexFiles) {
+      val offsetIndex = new OffsetIndex(file, file.getName.replace(".index","").toLong)
+      assertTrue(offsetIndex.lastOffset >= 0)
+      offsetIndex.close()
+    }
+    Utils.delete(logDir)
+  }
+
+  @Test
   def testCleanShutdownFile() {
     // append some messages to create some segments
     val logProps = new Properties()


Mime
View raw message