kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7076; Skip rebuilding producer state when using old message format (#5254)
Date Wed, 27 Jun 2018 21:48:18 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2db7eb7  KAFKA-7076; Skip rebuilding producer state when using old message format
(#5254)
2db7eb7 is described below

commit 2db7eb7a8c28f4f5d2550b45a9215948652f82ca
Author: Dhruvil Shah <dhruvil@confluent.io>
AuthorDate: Wed Jun 27 14:48:08 2018 -0700

    KAFKA-7076; Skip rebuilding producer state when using old message format (#5254)
    
    This patch removes the need to build up producer state when the log is using V0 / V1 message
format where we did not have idempotent and transactional producers yet.
    
    Also fixes a small issue where we incorrectly reported the offset index corrupt if the
last offset in the index is equal to the base offset of the segment.
---
 core/src/main/scala/kafka/log/Log.scala            | 61 +++++++++++---------
 core/src/main/scala/kafka/log/OffsetIndex.scala    |  4 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 67 +++++++++++++++++-----
 .../scala/unit/kafka/log/OffsetIndexTest.scala     |  9 +++
 4 files changed, 98 insertions(+), 43 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 3036018..e4be8fc 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -246,6 +246,10 @@ class Log(@volatile var dir: File,
     // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
     _leaderEpochCache.clearAndFlushEarliest(logStartOffset)
 
+    // Any segment loading or recovery code must not use producerStateManager, so that we
can build the full state here
+    // from scratch.
+    if (!producerStateManager.isEmpty)
+      throw new IllegalStateException("Producer state must be empty during log initialization")
     loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
 
     info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset
and " +
@@ -417,25 +421,14 @@ class Log(@volatile var dir: File,
    * @return The number of bytes truncated from the segment
    * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause
index offset overflow
    */
-  private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache]
= None): Int = lock synchronized {
-    val stateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
-    stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds)
-    logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment =>
-      val startOffset = math.max(segment.baseOffset, stateManager.mapEndOffset)
-      val fetchDataInfo = segment.read(startOffset, None, Int.MaxValue)
-      if (fetchDataInfo != null)
-        loadProducersFromLog(stateManager, fetchDataInfo.records)
-    }
-    stateManager.updateMapEndOffset(segment.baseOffset)
-
-    // take a snapshot for the first recovered segment to avoid reloading all the segments
if we shutdown before we
-    // checkpoint the recovery point
-    stateManager.takeSnapshot()
-    val bytesTruncated = segment.recover(stateManager, leaderEpochCache)
-
+  private def recoverSegment(segment: LogSegment,
+                             leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock
synchronized {
+    val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
+    rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager)
+    val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)
     // once we have recovered the segment's data, take a snapshot to ensure that we won't
     // need to reload the same segment again while recovering another segment.
-    stateManager.takeSnapshot()
+    producerStateManager.takeSnapshot()
     bytesTruncated
   }
 
@@ -565,10 +558,22 @@ class Log(@volatile var dir: File,
     recoveryPoint
   }
 
-  private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit
= lock synchronized {
+  // Rebuild producer state until lastOffset. This method may be called from the recovery
code path, and thus must be
+  // free of all side-effects, i.e. it must not update any log-specific state.
+  private def rebuildProducerState(lastOffset: Long,
+                                   reloadFromCleanShutdown: Boolean,
+                                   producerStateManager: ProducerStateManager): Unit = lock
synchronized {
     checkIfMemoryMappedBufferClosed()
     val messageFormatVersion = config.messageFormatVersion.recordVersion.value
-    info(s"Loading producer state from offset $lastOffset with message format version $messageFormatVersion")
+    val segments = logSegments
+    val offsetsToSnapshot =
+      if (segments.nonEmpty) {
+        val nextLatestSegmentBaseOffset = lowerSegment(segments.last.baseOffset).map(_.baseOffset)
+        Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), Some(lastOffset))
+      } else {
+        Seq(Some(lastOffset))
+      }
+    info(s"Loading producer state till offset $lastOffset with message format version $messageFormatVersion")
 
     // We want to avoid unnecessary scanning of the log to build the producer state when
the broker is being
     // upgraded. The basic idea is to use the absence of producer snapshot files to detect
the upgrade case,
@@ -582,13 +587,11 @@ class Log(@volatile var dir: File,
     // offset (see below). The next time the log is reloaded, we will load producer state
using this snapshot
     // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild
producer state
     // from the first segment.
-
-    if (producerStateManager.latestSnapshotOffset.isEmpty && (messageFormatVersion
< RecordBatch.MAGIC_VALUE_V2 || reloadFromCleanShutdown)) {
+    if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 ||
+        (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown))
{
       // To avoid an expensive scan through all of the segments, we take empty snapshots
from the start of the
       // last two segments and the last offset. This should avoid the full scan in the case
that the log needs
       // truncation.
-      val nextLatestSegmentBaseOffset = lowerSegment(activeSegment.baseOffset).map(_.baseOffset)
-      val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset),
Some(lastOffset))
       offsetsToSnapshot.flatten.foreach { offset =>
         producerStateManager.updateMapEndOffset(offset)
         producerStateManager.takeSnapshot()
@@ -607,19 +610,25 @@ class Log(@volatile var dir: File,
         logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
           val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset,
logStartOffset)
           producerStateManager.updateMapEndOffset(startOffset)
-          producerStateManager.takeSnapshot()
+
+          if (offsetsToSnapshot.contains(Some(segment.baseOffset)))
+            producerStateManager.takeSnapshot()
 
           val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue)
           if (fetchDataInfo != null)
             loadProducersFromLog(producerStateManager, fetchDataInfo.records)
         }
       }
-
       producerStateManager.updateMapEndOffset(lastOffset)
-      updateFirstUnstableOffset()
+      producerStateManager.takeSnapshot()
     }
   }
 
+  private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit
= lock synchronized {
+    rebuildProducerState(lastOffset, reloadFromCleanShutdown, producerStateManager)
+    updateFirstUnstableOffset()
+  }
+
   private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records):
Unit = {
     val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
     val completedTxns = ListBuffer.empty[CompletedTxn]
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 2babd00..6f246ee 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -188,9 +188,9 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1,
writabl
   }
 
   override def sanityCheck() {
-    if (_entries != 0 && _lastOffset <= baseOffset)
+    if (_entries != 0 && _lastOffset < baseOffset)
       throw new CorruptIndexException(s"Corrupt index found, index file (${file.getAbsolutePath})
has non-zero size " +
-        s"but the last offset is ${_lastOffset} which is no greater than the base offset
$baseOffset.")
+        s"but the last offset is ${_lastOffset} which is less than the base offset $baseOffset.")
     if (length % entrySize != 0)
       throw new CorruptIndexException(s"Index file ${file.getAbsolutePath} is corrupt, found
$length bytes which is " +
         s"neither positive nor a multiple of $entrySize.")
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3b5b2fa..9a9bc61 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 import java.nio.file.{Files, Paths}
 import java.util.Properties
 
+import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.log.Log.DeleteDirSuffix
 import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
@@ -206,8 +207,6 @@ class LogTest {
 
     // Reload after unclean shutdown with recoveryPoint set to log end offset
     log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
-    // Note that we don't maintain the guarantee of having a snapshot for the 2 most recent
segments in this case
-    expectedSnapshotOffsets = Vector(log.logSegments.last.baseOffset, log.logEndOffset)
     assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
     log.close()
 
@@ -215,15 +214,24 @@ class LogTest {
 
     // Reload after unclean shutdown with recoveryPoint set to 0
     log = createLog(logDir, logConfig, recoveryPoint = 0L)
-    // Is this working as intended?
+    // We progressively create a snapshot for each segment after the recovery point
     expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector :+ log.logEndOffset
     assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
     log.close()
   }
 
   @Test
-  def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10)
+  def testProducerSnapshotsRecoveryAfterUncleanShutdownV1(): Unit = {
+    testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.minSupportedFor(RecordVersion.V1).version)
+  }
+
+  @Test
+  def testProducerSnapshotsRecoveryAfterUncleanShutdownCurrentMessageFormat(): Unit = {
+    testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.latestVersion.version)
+  }
+
+  private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String):
Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion
= messageFormatVersion)
     var log = createLog(logDir, logConfig)
     assertEquals(None, log.oldestProducerSnapshotOffset)
 
@@ -247,6 +255,16 @@ class LogTest {
 
     val segmentsWithReads = ArrayBuffer[LogSegment]()
     val recoveredSegments = ArrayBuffer[LogSegment]()
+    val expectedSegmentsWithReads = ArrayBuffer[Long]()
+    val expectedSnapshotOffsets = ArrayBuffer[Long]()
+
+    if (logConfig.messageFormatVersion < KAFKA_0_11_0_IV0) {
+      expectedSegmentsWithReads += activeSegmentOffset
+      expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(2)
:+ log.logEndOffset
+    } else {
+      expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Seq(activeSegmentOffset)
+      expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(4)
:+ log.logEndOffset
+    }
 
     def createLogWithInterceptedReads(recoveryPoint: Long) = {
       val maxProducerIdExpirationMs = 60 * 60 * 1000
@@ -283,9 +301,8 @@ class LogTest {
     ProducerStateManager.deleteSnapshotsBefore(logDir, segmentOffsets(segmentOffsets.size
- 2))
     log = createLogWithInterceptedReads(offsetForRecoveryPointSegment)
     // We will reload all segments because the recovery point is behind the producer snapshot
files (pre KAFKA-5829 behaviour)
-    assertEquals(segOffsetsBeforeRecovery, segmentsWithReads.map(_.baseOffset) -- Seq(activeSegmentOffset))
+    assertEquals(expectedSegmentsWithReads, segmentsWithReads.map(_.baseOffset))
     assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
-    var expectedSnapshotOffsets = segmentOffsets.takeRight(4) :+ log.logEndOffset
     assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
     log.close()
     segmentsWithReads.clear()
@@ -297,13 +314,12 @@ class LogTest {
     log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint)
     assertEquals(Seq(activeSegmentOffset), segmentsWithReads.map(_.baseOffset))
     assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
-    expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+
log.logEndOffset
     assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
 
     // Verify that we keep 2 snapshot files if we checkpoint the log end offset
     log.deleteSnapshotsAfterRecoveryPointCheckpoint()
-    expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+
log.logEndOffset
-    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+    val expectedSnapshotsAfterDelete = log.logSegments.map(_.baseOffset).toVector.takeRight(2)
:+ log.logEndOffset
+    assertEquals(expectedSnapshotsAfterDelete, listProducerSnapshotOffsets)
     log.close()
   }
 
@@ -398,6 +414,9 @@ class LogTest {
     // We skip directly to updating the map end offset
     stateManager.updateMapEndOffset(1L)
     EasyMock.expectLastCall()
+    // Finally, we take a snapshot
+    stateManager.takeSnapshot()
+    EasyMock.expectLastCall().once()
 
     EasyMock.replay(stateManager)
 
@@ -410,14 +429,18 @@ class LogTest {
   def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = {
     val stateManager = EasyMock.mock(classOf[ProducerStateManager])
 
-    EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
-
     stateManager.updateMapEndOffset(0L)
     EasyMock.expectLastCall().anyTimes()
 
     stateManager.takeSnapshot()
     EasyMock.expectLastCall().anyTimes()
 
+    EasyMock.expect(stateManager.isEmpty).andReturn(true)
+    EasyMock.expectLastCall().once()
+
+    EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None)
+    EasyMock.expectLastCall().once()
+
     EasyMock.replay(stateManager)
 
     val logProps = new Properties()
@@ -443,14 +466,18 @@ class LogTest {
   def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = {
     val stateManager = EasyMock.mock(classOf[ProducerStateManager])
 
-    EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
-
     stateManager.updateMapEndOffset(0L)
     EasyMock.expectLastCall().anyTimes()
 
     stateManager.takeSnapshot()
     EasyMock.expectLastCall().anyTimes()
 
+    EasyMock.expect(stateManager.isEmpty).andReturn(true)
+    EasyMock.expectLastCall().once()
+
+    EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None)
+    EasyMock.expectLastCall().once()
+
     EasyMock.replay(stateManager)
 
     val cleanShutdownFile = createCleanShutdownFile()
@@ -487,6 +514,12 @@ class LogTest {
     stateManager.takeSnapshot()
     EasyMock.expectLastCall().anyTimes()
 
+    EasyMock.expect(stateManager.isEmpty).andReturn(true)
+    EasyMock.expectLastCall().once()
+
+    EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None)
+    EasyMock.expectLastCall().once()
+
     EasyMock.replay(stateManager)
 
     val cleanShutdownFile = createCleanShutdownFile()
@@ -644,8 +677,12 @@ class LogTest {
     assertEquals(2, log.latestProducerStateEndOffset)
 
     log.truncateTo(1)
-    assertEquals(None, log.latestProducerSnapshotOffset)
+    assertEquals(Some(1), log.latestProducerSnapshotOffset)
     assertEquals(1, log.latestProducerStateEndOffset)
+
+    log.truncateTo(0)
+    assertEquals(None, log.latestProducerSnapshotOffset)
+    assertEquals(0, log.latestProducerStateEndOffset)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 1529597..f47da99 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -178,6 +178,15 @@ class OffsetIndexTest extends JUnitSuite {
     // mmap should be null after unmap causing lookup to throw a NPE
     intercept[NullPointerException](idx.lookup(1))
   }
+
+  @Test
+  def testSanityLastOffsetEqualToBaseOffset(): Unit = {
+    // Test index sanity for the case where the last offset appended to the index is equal
to the base offset
+    val baseOffset = 20L
+    val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = baseOffset, maxIndexSize
= 10 * 8)
+    idx.append(baseOffset, 0)
+    idx.sanityCheck()
+  }
   
   def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, klass: Class[T])
{
     try {


Mime
View raw message