kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7959; Delete leader epoch cache files with old message format versions (#6298)
Date Fri, 22 Feb 2019 22:56:28 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 48986f3  KAFKA-7959; Delete leader epoch cache files with old message format versions
(#6298)
48986f3 is described below

commit 48986f380efa3ddfe3cc5fbaa1544d679fc04896
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Fri Feb 22 22:56:08 2019 +0000

    KAFKA-7959; Delete leader epoch cache files with old message format versions (#6298)
    
    It is important to clean up any cached epochs that may exist if the log message format
does not support it (due to a regression in KAFKA-7415). Otherwise, the broker might make
use of them once it upgrades its message format. This can cause unnecessary truncation of
data.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../apache/kafka/common/record/RecordVersion.java  |  9 ++++
 core/src/main/scala/kafka/cluster/Replica.scala    |  6 +--
 core/src/main/scala/kafka/log/Log.scala            | 21 ++++++++-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  |  4 ++
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 52 ++++++++++++++++++++++
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  1 +
 6 files changed, 88 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java b/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
index 1f80d62..8406d53 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
@@ -34,6 +34,15 @@ public enum RecordVersion {
         this.value = (byte) value;
     }
 
+    /**
+     * Check whether this version precedes another version.
+     *
+     * @return true only if the magic value is less than the other's
+     */
+    public boolean precedes(RecordVersion other) {
+        return this.value < other.value;
+    }
+
     public static RecordVersion lookup(byte value) {
         if (value < 0 || value >= VALUES.length)
             throw new IllegalArgumentException("Unknown record version: " + value);
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index f02308e..7ddd58f 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -59,10 +59,10 @@ class Replica(val brokerId: Int,
 
   def epochs: Option[LeaderEpochFileCache] = {
     log.flatMap { log =>
-      if (log.recordVersion.value < RecordVersion.V2.value)
-        None
-      else
+      if (log.supportsLeaderEpoch)
         Some(log.leaderEpochCache)
+      else
+        None
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d79d9f1..01ec5a8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -37,6 +37,7 @@ import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel,
LogO
 import kafka.utils._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.EpochEndOffset.UNDEFINED_EPOCH
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -183,6 +184,8 @@ class Log(@volatile var dir: File,
 
   def recordVersion: RecordVersion = config.messageFormatVersion.recordVersion
 
+  def supportsLeaderEpoch = recordVersion.value >= RecordVersion.V2.value
+
   def initFileSize: Int = {
     if (config.preallocate)
       config.segmentSize
@@ -198,7 +201,15 @@ class Log(@volatile var dir: File,
       warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is set to ${newConfig.retentionMs}.
It is smaller than " +
         s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${newConfig.messageTimestampDifferenceMaxMs}.
" +
         s"This may result in frequent log rolling.")
+    val oldConfig = this.config
     this.config = newConfig
+    if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) {
+      val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
+      val newRecordVersion = newConfig.messageFormatVersion.recordVersion
+      if (newRecordVersion.precedes(oldRecordVersion))
+        warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
+      _leaderEpochCache = initializeLeaderEpochCache()
+    }
   }
 
   private def checkIfMemoryMappedBufferClosed(): Unit = {
@@ -302,7 +313,13 @@ class Log(@volatile var dir: File,
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
     val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)
-    new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
+    val cache = new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
+
+    if (!supportsLeaderEpoch && cache.nonEmpty) {
+      warn(s"Clearing non-empty leader epoch cache due to incompatible message format $recordVersion")
+      cache.clearAndFlush()
+    }
+    cache
   }
 
   /**
@@ -541,7 +558,7 @@ class Log(@volatile var dir: File,
         info(s"Recovering unflushed segment ${segment.baseOffset}")
         val truncatedBytes =
           try {
-            recoverSegment(segment, Some(_leaderEpochCache))
+            recoverSegment(segment, if (supportsLeaderEpoch) Some(_leaderEpochCache) else
None)
           } catch {
             case _: InvalidOffsetException =>
               val startOffset = segment.baseOffset
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index cee6bb6..9e7077d 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -84,6 +84,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
     }
   }
 
+  def nonEmpty: Boolean = inReadLock(lock) {
+    epochs.nonEmpty
+  }
+
   /**
     * Returns the current Leader Epoch. This is the latest epoch
     * which has messages assigned to it.
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index b0f215e..d8b4043 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -25,6 +25,7 @@ import java.util.Properties
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.log.Log.DeleteDirSuffix
+import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
 import kafka.utils._
@@ -33,6 +34,7 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.IsolationLevel
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -2598,6 +2600,56 @@ class LogTest {
   def topicPartitionName(topic: String, partition: String): String =
     topic + "-" + partition
 
+  /**
+    * Due to KAFKA-7968, we want to make sure that we do not
+    * make use of old leader epoch cache files when the message format does not support it
+    */
+  @Test
+  def testOldMessageFormatDeletesEpochCacheIfUnsupported(): Unit = {
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds
- 1000)
+    val epochCacheSupportingConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes
* 5, segmentIndexBytes = 1000, retentionMs = 999)
+
+    // append some records to create segments and assign some epochs to create epoch files
+    val log = createLog(logDir, epochCacheSupportingConfig)
+    for (_ <- 0 until 100)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    log.leaderEpochCache.assign(0, 40)
+    log.leaderEpochCache.assign(1, 90)
+    assertEquals((1, 100), log.leaderEpochCache.endOffsetFor(1))
+
+    // instantiate the log with an old format that does not support the leader epoch
+    val epochCacheNonSupportingConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes
* 5, segmentIndexBytes = 1000,
+      retentionMs = 999, messageFormatVersion = "0.10.2")
+    val log2 = createLog(logDir, epochCacheNonSupportingConfig)
+    assertLeaderEpochCacheEmpty(log2)
+  }
+
+  @Test
+  def testLeaderEpochCacheClearedAfterDynamicMessageFormatDowngrade(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1,
maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch
= 5)
+    assertEquals((5, 1), log.leaderEpochCache.endOffsetFor(5))
+
+    val downgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes
= 1,
+      maxMessageBytes = 64 * 1024, messageFormatVersion = kafka.api.KAFKA_0_10_2_IV0.shortVersion)
+    log.updateConfig(Set(LogConfig.MessageFormatVersionProp), downgradedLogConfig)
+    assertLeaderEpochCacheEmpty(log)
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())),
+      magicValue = RecordVersion.V1.value), leaderEpoch = 5)
+    assertLeaderEpochCacheEmpty(log)
+  }
+
+  private def assertLeaderEpochCacheEmpty(log: Log): Unit = {
+    assertFalse(log.leaderEpochCache.nonEmpty)
+
+    // check that the file is empty as well
+    val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(log.dir))
+    val cache = new LeaderEpochFileCache(log.topicPartition, log.logEndOffset _, checkpointFile)
+    assertFalse(cache.nonEmpty)
+  }
+
   @Test
   def testDeleteOldSegments() {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds
- 1000)
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 17c95fe..0c317be 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -51,6 +51,7 @@ class OffsetsForLeaderEpochTest {
     val logManager = createNiceMock(classOf[kafka.log.LogManager])
     expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
     expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
+    expect(mockLog.supportsLeaderEpoch).andReturn(true).anyTimes()
     expect(mockLog.recordVersion).andReturn(RecordVersion.V2).anyTimes()
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     replay(mockCache, mockLog, logManager)


Mime
View raw message