kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9105; Add back truncateHead method to ProducerStateManager (#7599)
Date Fri, 25 Oct 2019 21:49:34 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 815a228  KAFKA-9105; Add back truncateHead method to ProducerStateManager (#7599)
815a228 is described below

commit 815a2287357bf4028e8a78e3ac15ada951d4067d
Author: Bob Barrett <bob.barrett@confluent.io>
AuthorDate: Fri Oct 25 14:40:45 2019 -0700

    KAFKA-9105; Add back truncateHead method to ProducerStateManager (#7599)
    
    The truncateHead method was removed from ProducerStateManager by github.com/apache/kafka/commit/c49775b.
This meant that snapshots were no longer removed when the log start offset increased, even
though the intent of that change was to remove snapshots but preserve the in-memory mapping.
This patch adds the required functionality back.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/log/Log.scala            |  1 +
 .../scala/kafka/log/ProducerStateManager.scala     | 21 ++++++++++++++--
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 29 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 5e04c3c..ccf1d16 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1254,6 +1254,7 @@ class Log(@volatile var dir: File,
           info(s"Incrementing log start offset to $newLogStartOffset")
           logStartOffset = newLogStartOffset
           leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
+          producerStateManager.truncateHead(newLogStartOffset)
           maybeIncrementFirstUnstableOffset()
         }
       }
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index ae5b77a..04dafd8 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -598,8 +598,9 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    * Truncate the producer id mapping to the given offset range and reload the entries from
the most recent
    * snapshot in range (if there is one). We delete snapshot files prior to the logStartOffset
but do not remove
    * producer state from the map. This means that in-memory and on-disk state can diverge,
and in the case of
-   * broker failover or unclean shutdown, any in-memory state not persisted in the snapshots
will be lost.
-   * Note that the log end offset is assumed to be less than or equal to the high watermark.
+   * broker failover or unclean shutdown, any in-memory state not persisted in the snapshots
will be lost, which
+   * would lead to UNKNOWN_PRODUCER_ID errors. Note that the log end offset is assumed to
be less than or equal
+   * to the high watermark.
    */
   def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit
= {
     // remove all out of range snapshots
@@ -615,6 +616,8 @@ class ProducerStateManager(val topicPartition: TopicPartition,
       // safe to clear the unreplicated transactions
       unreplicatedTxns.clear()
       loadFromSnapshot(logStartOffset, currentTimeMs)
+    } else {
+      truncateHead(logStartOffset)
     }
   }
 
@@ -688,6 +691,20 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    */
   def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file))
 
+  /**
+   * When we remove the head of the log due to retention, we need to remove snapshots older
than
+   * the new log start offset.
+   */
+  def truncateHead(logStartOffset: Long): Unit = {
+    removeUnreplicatedTransactions(logStartOffset)
+
+    if (lastMapOffset < logStartOffset)
+      lastMapOffset = logStartOffset
+
+    deleteSnapshotsBefore(logStartOffset)
+    lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
+  }
+
   private def removeUnreplicatedTransactions(offset: Long): Unit = {
     val iterator = unreplicatedTxns.entrySet.iterator
     while (iterator.hasNext) {
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index fdf40c6..570e252 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1504,6 +1504,35 @@ class LogTest {
     log.appendAsLeader(nextRecords, leaderEpoch = 0)
   }
 
+  @Test
+  def testDeleteSnapshotsOnIncrementLogStartOffset(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val pid2 = 2L
+    val epoch = 0.toShort
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)),
producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)),
producerId = pid2,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+
+    assertEquals(2, log.activeProducersWithLastSequence.size)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
+
+    log.updateHighWatermark(log.logEndOffset)
+    log.maybeIncrementLogStartOffset(2L)
+
+    // Deleting records should not remove producer state but should delete snapshots
+    assertEquals(2, log.activeProducersWithLastSequence.size)
+    assertEquals(1, ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
+    val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
+    assertTrue(retainedLastSeqOpt.isDefined)
+    assertEquals(0, retainedLastSeqOpt.get)
+  }
+
   /**
    * Test for jitter s for time based log roll. This test appends messages then changes the
time
    * using the mock clock to force the log to roll and checks the number of segments.


Mime
View raw message