kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
Date Fri, 03 May 2019 00:46:13 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3ba4686  KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
3ba4686 is described below

commit 3ba4686d4d650f0f9155b2e22dddb192a5a56a6c
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu May 2 17:46:01 2019 -0700

    KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
    
    During a partial message format upgrade, it is possible for the message format to flap
between new and old versions. If we detect that data appended to the log is on an old format,
we can clear the leader epoch cache so that we revert to truncation by high watermark. Once
the upgrade completes and all replicas are on the same format, we will append to the epoch
cache as usual. Note this is related to KAFKA-7897, which handles message format downgrades
through configuration.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala          | 11 ++++++++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala | 13 +++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 87179db..df71560 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -909,8 +909,17 @@ class Log(@volatile var dir: File,
 
         // update the epoch cache with the epoch stamped onto the message by the leader
         validRecords.batches.asScala.foreach { batch =>
-          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
             maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
+          } else {
+            // In partial upgrade scenarios, we may get a temporary regression to the message
format. In
+            // order to ensure the safety of leader election, we clear the epoch cache so
that we revert
+            // to truncation by high watermark after the next leader election.
+            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
+              warn(s"Clearing leader epoch cache after unexpected append with message format
v${batch.magic}")
+              cache.clearAndFlush()
+            }
+          }
         }
 
         // check messages set size may be exceed config.segmentSize
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2b3e362..3d6fb0d 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2185,6 +2185,19 @@ class LogTest {
   }
 
   @Test
+  def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1,
maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch
= 5)
+    assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch))
+
+    log.appendAsFollower(TestUtils.records(List(new SimpleRecord("foo".getBytes())),
+      baseOffset = 1L,
+      magicValue = RecordVersion.V1.value))
+    assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch))
+  }
+
+  @Test
   def testLeaderEpochCacheClearedAfterStaticMessageFormatDowngrade(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1,
maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)


Mime
View raw message