kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
Date Fri, 03 May 2019 01:08:50 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 61dac29  KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
61dac29 is described below

commit 61dac2965f628e6dfc330181f534388a479d954f
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu May 2 17:46:01 2019 -0700

    KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
    
    During a partial message format upgrade, it is possible for the message format to flap
between new and old versions. If we detect that data appended to the log is on an old format,
we can clear the leader epoch cache so that we revert to truncation by high watermark. Once
the upgrade completes and all replicas are on the same format, we will append to the epoch
cache as usual. Note this is related to KAFKA-7897, which handles message format downgrades
through configuration.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala          | 11 ++++++++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala | 13 +++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index af0f343..9811a0e 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -907,8 +907,17 @@ class Log(@volatile var dir: File,
 
         // update the epoch cache with the epoch stamped onto the message by the leader
         validRecords.batches.asScala.foreach { batch =>
-          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
             maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
+          } else {
+            // In partial upgrade scenarios, we may get a temporary regression to the message
format. In
+            // order to ensure the safety of leader election, we clear the epoch cache so
that we revert
+            // to truncation by high watermark after the next leader election.
+            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
+              warn(s"Clearing leader epoch cache after unexpected append with message format
v${batch.magic}")
+              cache.clearAndFlush()
+            }
+          }
         }
 
         // check messages set size may be exceed config.segmentSize
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2af97fa..cbc83a9 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2135,6 +2135,19 @@ class LogTest {
   }
 
   @Test
+  def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1,
maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch
= 5)
+    assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch))
+
+    log.appendAsFollower(TestUtils.records(List(new SimpleRecord("foo".getBytes())),
+      baseOffset = 1L,
+      magicValue = RecordVersion.V1.value))
+    assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch))
+  }
+
+  @Test
   def testLeaderEpochCacheClearedAfterStaticMessageFormatDowngrade(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1,
maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)


Mime
View raw message