kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
Date Fri, 03 May 2019 00:51:09 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 8d6e2a2  KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
8d6e2a2 is described below

commit 8d6e2a2aac9231deba1a6c17c0ee530c275d13bf
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu May 2 17:46:01 2019 -0700

    KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
    
    During a partial message format upgrade, it is possible for the message format to flap
between new and old versions. If we detect that data appended to the log is on an old format,
we can clear the leader epoch cache so that we revert to truncation by high watermark. Once
the upgrade completes and all replicas are on the same format, we will append to the epoch
cache as usual. Note this is related to KAFKA-7897, which handles message format downgrades
through configuration.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala          | 11 ++++++++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala | 13 +++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 87179db..df71560 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -909,8 +909,17 @@ class Log(@volatile var dir: File,
 
         // update the epoch cache with the epoch stamped onto the message by the leader
         validRecords.batches.asScala.foreach { batch =>
-          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
             maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
+          } else {
+            // In partial upgrade scenarios, we may get a temporary regression to the message
format. In
+            // order to ensure the safety of leader election, we clear the epoch cache so
that we revert
+            // to truncation by high watermark after the next leader election.
+            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
+              warn(s"Clearing leader epoch cache after unexpected append with message format
v${batch.magic}")
+              cache.clearAndFlush()
+            }
+          }
         }
 
         // check messages set size may be exceed config.segmentSize
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index f0604d5..79a462c 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2176,6 +2176,19 @@ class LogTest {
   }
 
   @Test
+  def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1,
maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch
= 5)
+    assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch))
+
+    log.appendAsFollower(TestUtils.records(List(new SimpleRecord("foo".getBytes())),
+      baseOffset = 1L,
+      magicValue = RecordVersion.V1.value))
+    assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch))
+  }
+
+  @Test
   def testLeaderEpochCacheClearedAfterStaticMessageFormatDowngrade(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1,
maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)


Mime
View raw message