kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10432; LeaderEpochCache is incorrectly recovered for leader epoch 0 (#9219)
Date Tue, 08 Sep 2020 20:01:30 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new bbccf5d  KAFKA-10432; LeaderEpochCache is incorrectly recovered for leader epoch
0 (#9219)
bbccf5d is described below

commit bbccf5d448496f595b58c06f0afa5086978c1da5
Author: Lucas Bradstreet <lucas@confluent.io>
AuthorDate: Tue Sep 8 12:43:36 2020 -0700

    KAFKA-10432; LeaderEpochCache is incorrectly recovered for leader epoch 0 (#9219)
    
    The leader epoch cache is incorrectly recovered for epoch 0 as the
    assignment is skipped when epoch == 0. This check was likely intended to
    prevent negative epochs from being applied or there was an assumption
    that epochs started at 1.
    
    A test has been added to LogSegmentTest to show the LogSegment
    recovery path works for the epoch cache. This was a test gap as none of the
    recover calls supply a leader epoch cache to recover.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/log/LogSegment.scala     |  2 +-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala | 56 ++++++++++++++++++++--
 2 files changed, 52 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 61a6e59..b111fb3 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -361,7 +361,7 @@ class LogSegment private[log] (val log: FileRecords,
 
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
           leaderEpochCache.foreach { cache =>
-            if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch
> _))
+            if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch
> _))
               cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
           }
           updateProducerState(producerStateManager, batch)
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 5595c90..dc76e97 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -18,6 +18,9 @@ package kafka.log
 
 import java.io.File
 
+import kafka.server.checkpoints.LeaderEpochCheckpoint
+import kafka.server.epoch.EpochEntry
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils.checkEquals
 import org.apache.kafka.common.TopicPartition
@@ -28,6 +31,7 @@ import org.junit.{After, Before, Test}
 
 import scala.jdk.CollectionConverters._
 import scala.collection._
+import scala.collection.mutable.ArrayBuffer
 
 class LogSegmentTest {
 
@@ -316,26 +320,26 @@ class LogSegmentTest {
 
     // append transactional records from pid1
     segment.append(largestOffset = 101L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
-      shallowOffsetOfMaxTimestamp = 100L, MemoryRecords.withTransactionalRecords(100L, CompressionType.NONE,
+      shallowOffsetOfMaxTimestamp = 100L, records = MemoryRecords.withTransactionalRecords(100L,
CompressionType.NONE,
         pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes),
new SimpleRecord("b".getBytes)))
 
     // append transactional records from pid2
     segment.append(largestOffset = 103L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
-      shallowOffsetOfMaxTimestamp = 102L, MemoryRecords.withTransactionalRecords(102L, CompressionType.NONE,
+      shallowOffsetOfMaxTimestamp = 102L, records = MemoryRecords.withTransactionalRecords(102L,
CompressionType.NONE,
         pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes),
new SimpleRecord("b".getBytes)))
 
     // append non-transactional records
     segment.append(largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
-      shallowOffsetOfMaxTimestamp = 104L, MemoryRecords.withRecords(104L, CompressionType.NONE,
+      shallowOffsetOfMaxTimestamp = 104L, records = MemoryRecords.withRecords(104L, CompressionType.NONE,
         partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
 
     // abort the transaction from pid2 (note LSO should be 100L since the txn from pid1 has
not completed)
     segment.append(largestOffset = 106L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
-      shallowOffsetOfMaxTimestamp = 106L, endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch,
offset = 106L))
+      shallowOffsetOfMaxTimestamp = 106L, records = endTxnRecords(ControlRecordType.ABORT,
pid2, producerEpoch, offset = 106L))
 
     // commit the transaction from pid1
     segment.append(largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
-      shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch,
offset = 107L))
+      shallowOffsetOfMaxTimestamp = 107L, records = endTxnRecords(ControlRecordType.COMMIT,
pid1, producerEpoch, offset = 107L))
 
     var stateManager = new ProducerStateManager(topicPartition, logDir)
     segment.recover(stateManager)
@@ -367,6 +371,48 @@ class LogSegmentTest {
     assertEquals(100L, abortedTxn.lastStableOffset)
   }
 
+  /**
+   * Create a segment with some data, then recover the segment.
+   * The epoch cache entries should reflect the segment.
+   */
+  @Test
+  def testRecoveryRebuildsEpochCache(): Unit = {
+    val seg = createSegment(0)
+
+    val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
+      private var epochs = Seq.empty[EpochEntry]
+
+      override def write(epochs: Seq[EpochEntry]): Unit = {
+        this.epochs = epochs.toVector
+      }
+
+      override def read(): Seq[EpochEntry] = this.epochs
+    }
+
+    val cache = new LeaderEpochFileCache(topicPartition, () => seg.readNextOffset, checkpoint)
+    seg.append(largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+      shallowOffsetOfMaxTimestamp = 104L, records = MemoryRecords.withRecords(104L, CompressionType.NONE,
0,
+        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    seg.append(largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+      shallowOffsetOfMaxTimestamp = 106L, records = MemoryRecords.withRecords(106L, CompressionType.NONE,
1,
+        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    seg.append(largestOffset = 109L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+      shallowOffsetOfMaxTimestamp = 108L, records = MemoryRecords.withRecords(108L, CompressionType.NONE,
1,
+        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    seg.append(largestOffset = 111L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+      shallowOffsetOfMaxTimestamp = 110, records = MemoryRecords.withRecords(110L, CompressionType.NONE,
2,
+        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    seg.recover(new ProducerStateManager(topicPartition, logDir), Some(cache))
+    assertEquals(ArrayBuffer(EpochEntry(epoch = 0, startOffset = 104L),
+                             EpochEntry(epoch = 1, startOffset = 106),
+                             EpochEntry(epoch = 2, startOffset = 110)),
+      cache.epochEntries)
+  }
+
   private def endTxnRecords(controlRecordType: ControlRecordType,
                             producerId: Long,
                             producerEpoch: Short,


Mime
View raw message