kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5186; Avoid expensive log scan to build producer state when upgrading
Date Mon, 22 May 2017 22:43:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 67f691bbd -> a9c9dbf58


KAFKA-5186; Avoid expensive log scan to build producer state when upgrading

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3113 from hachikuji/KAFKA-5186

(cherry picked from commit fcdbb71953fc4c92559a9c7adb4cb8ad4a74acd6)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9c9dbf5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9c9dbf5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9c9dbf5

Branch: refs/heads/0.11.0
Commit: a9c9dbf58f0e9ccf873e100b62f0f811ad5e4659
Parents: 67f691b
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon May 22 15:41:26 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon May 22 15:41:39 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 50 +++++++++++++-------
 .../src/test/scala/unit/kafka/log/LogTest.scala | 27 +++++++++++
 2 files changed, 60 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c9dbf5/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 7a47657..55eb46a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -421,25 +421,41 @@ class Log(@volatile var dir: File,
 
   private def loadProducerState(lastOffset: Long): Unit = lock synchronized {
     info(s"Loading producer state from offset $lastOffset for partition $topicPartition")
-    val currentTimeMs = time.milliseconds
-    producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs)
-
-    // only do the potentially expensive reloading of the last snapshot offset is lower than
the
-    // log end offset (which would be the case on first startup) and there are active producers.
-    // if there are no active producers, then truncating shouldn't change that fact (although
it
-    // could cause a producerId to expire earlier than expected), so we can skip the loading.
-    // This is an optimization for users which are not yet using idempotent/transactional
features yet.
-    if (lastOffset > producerStateManager.mapEndOffset || !producerStateManager.isEmpty)
{
-      logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
-        val startOffset = math.max(segment.baseOffset, producerStateManager.mapEndOffset)
-        val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue)
-        if (fetchDataInfo != null)
-          loadProducersFromLog(producerStateManager, fetchDataInfo.records)
+
+    if (producerStateManager.latestSnapshotOffset.isEmpty) {
+      // if there are no snapshots to load producer state from, we assume that the brokers
are
+      // being upgraded, which means there would be no previous idempotent/transactional
producers
+      // to load state for. To avoid an expensive scan through all of the segments, we take
+      // empty snapshots from the start of the last two segments and the last offset. The
purpose
+      // of taking the segment snapshots is to avoid the full scan in the case that the log
needs
+      // truncation.
+      val nextLatestSegmentBaseOffset = Option(segments.lowerEntry(activeSegment.baseOffset)).map(_.getValue.baseOffset)
+      val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset),
Some(lastOffset))
+      offsetsToSnapshot.flatten.foreach { offset =>
+        producerStateManager.updateMapEndOffset(offset)
+        producerStateManager.takeSnapshot()
+      }
+    } else {
+      val currentTimeMs = time.milliseconds
+      producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs)
+
+      // only do the potentially expensive reloading of the last snapshot offset is lower
than the
+      // log end offset (which would be the case on first startup) and there are active producers.
+      // if there are no active producers, then truncating shouldn't change that fact (although
it
+      // could cause a producerId to expire earlier than expected), so we can skip the loading.
+      // This is an optimization for users which are not yet using idempotent/transactional
features yet.
+      if (lastOffset > producerStateManager.mapEndOffset || !producerStateManager.isEmpty)
{
+        logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
+          val startOffset = math.max(segment.baseOffset, producerStateManager.mapEndOffset)
+          val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue)
+          if (fetchDataInfo != null)
+            loadProducersFromLog(producerStateManager, fetchDataInfo.records)
+        }
       }
-    }
 
-    producerStateManager.updateMapEndOffset(lastOffset)
-    updateFirstUnstableOffset()
+      producerStateManager.updateMapEndOffset(lastOffset)
+      updateFirstUnstableOffset()
+    }
   }
 
   private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records):
Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c9dbf5/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 99ebd15..84ff43b 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -169,6 +169,33 @@ class LogTest {
   }
 
   @Test
+  def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
+    // simulate the upgrade path by creating a new log with several segments, deleting the
+    // snapshot files, and then reloading the log
+
+    val log = createLog(64, messagesPerSegment = 10)
+    assertEquals(None, log.oldestProducerSnapshotOffset)
+
+    for (i <- 0 to 100) {
+      val record = new SimpleRecord(time.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+    }
+
+    assertTrue(log.logSegments.size >= 2)
+    log.close()
+
+    logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach
{ file =>
+      Files.delete(file.toPath)
+    }
+
+    val reloadedLog = createLog(64, messagesPerSegment = 10)
+    val expectedSnapshotsOffsets = log.logSegments.toSeq.reverse.take(2).map(_.baseOffset)
++ Seq(reloadedLog.logEndOffset)
+    expectedSnapshotsOffsets.foreach { offset =>
+      assertTrue(Log.producerSnapshotFile(logDir, offset).exists)
+    }
+  }
+
+  @Test
   def testPidMapOffsetUpdatedForNonIdempotentData() {
     val log = createLog(2048)
     val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes,
"value".getBytes)))


Mime
View raw message