This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new bbccf5d KAFKA-10432; LeaderEpochCache is incorrectly recovered for leader epoch
0 (#9219)
bbccf5d is described below
commit bbccf5d448496f595b58c06f0afa5086978c1da5
Author: Lucas Bradstreet <lucas@confluent.io>
AuthorDate: Tue Sep 8 12:43:36 2020 -0700
KAFKA-10432; LeaderEpochCache is incorrectly recovered for leader epoch 0 (#9219)
The leader epoch cache is incorrectly recovered for epoch 0 as the
assignment is skipped when epoch == 0. This check was likely intended to
prevent negative epochs from being applied or there was an assumption
that epochs started at 1.
A test has been added to LogSegmentTest to show the LogSegment
recovery path works for the epoch cache. This was a test gap as none of the
recover calls supply a leader epoch cache to recover.
Reviewers: Jason Gustafson <jason@confluent.io>
---
core/src/main/scala/kafka/log/LogSegment.scala | 2 +-
.../test/scala/unit/kafka/log/LogSegmentTest.scala | 56 ++++++++++++++++++++--
2 files changed, 52 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 61a6e59..b111fb3 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -361,7 +361,7 @@ class LogSegment private[log] (val log: FileRecords,
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
- if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch
> _))
+ if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch
> _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 5595c90..dc76e97 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -18,6 +18,9 @@ package kafka.log
import java.io.File
+import kafka.server.checkpoints.LeaderEpochCheckpoint
+import kafka.server.epoch.EpochEntry
+import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.TestUtils
import kafka.utils.TestUtils.checkEquals
import org.apache.kafka.common.TopicPartition
@@ -28,6 +31,7 @@ import org.junit.{After, Before, Test}
import scala.jdk.CollectionConverters._
import scala.collection._
+import scala.collection.mutable.ArrayBuffer
class LogSegmentTest {
@@ -316,26 +320,26 @@ class LogSegmentTest {
// append transactional records from pid1
segment.append(largestOffset = 101L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
- shallowOffsetOfMaxTimestamp = 100L, MemoryRecords.withTransactionalRecords(100L, CompressionType.NONE,
+ shallowOffsetOfMaxTimestamp = 100L, records = MemoryRecords.withTransactionalRecords(100L,
CompressionType.NONE,
pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes),
new SimpleRecord("b".getBytes)))
// append transactional records from pid2
segment.append(largestOffset = 103L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
- shallowOffsetOfMaxTimestamp = 102L, MemoryRecords.withTransactionalRecords(102L, CompressionType.NONE,
+ shallowOffsetOfMaxTimestamp = 102L, records = MemoryRecords.withTransactionalRecords(102L,
CompressionType.NONE,
pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes),
new SimpleRecord("b".getBytes)))
// append non-transactional records
segment.append(largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
- shallowOffsetOfMaxTimestamp = 104L, MemoryRecords.withRecords(104L, CompressionType.NONE,
+ shallowOffsetOfMaxTimestamp = 104L, records = MemoryRecords.withRecords(104L, CompressionType.NONE,
partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// abort the transaction from pid2 (note LSO should be 100L since the txn from pid1 has
not completed)
segment.append(largestOffset = 106L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
- shallowOffsetOfMaxTimestamp = 106L, endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch,
offset = 106L))
+ shallowOffsetOfMaxTimestamp = 106L, records = endTxnRecords(ControlRecordType.ABORT,
pid2, producerEpoch, offset = 106L))
// commit the transaction from pid1
segment.append(largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
- shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch,
offset = 107L))
+ shallowOffsetOfMaxTimestamp = 107L, records = endTxnRecords(ControlRecordType.COMMIT,
pid1, producerEpoch, offset = 107L))
var stateManager = new ProducerStateManager(topicPartition, logDir)
segment.recover(stateManager)
@@ -367,6 +371,48 @@ class LogSegmentTest {
assertEquals(100L, abortedTxn.lastStableOffset)
}
+ /**
+ * Create a segment with some data, then recover the segment.
+ * The epoch cache entries should reflect the segment.
+ */
+ @Test
+ def testRecoveryRebuildsEpochCache(): Unit = {
+ val seg = createSegment(0)
+
+ val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
+ private var epochs = Seq.empty[EpochEntry]
+
+ override def write(epochs: Seq[EpochEntry]): Unit = {
+ this.epochs = epochs.toVector
+ }
+
+ override def read(): Seq[EpochEntry] = this.epochs
+ }
+
+ val cache = new LeaderEpochFileCache(topicPartition, () => seg.readNextOffset, checkpoint)
+ seg.append(largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+ shallowOffsetOfMaxTimestamp = 104L, records = MemoryRecords.withRecords(104L, CompressionType.NONE,
0,
+ new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+ seg.append(largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+ shallowOffsetOfMaxTimestamp = 106L, records = MemoryRecords.withRecords(106L, CompressionType.NONE,
1,
+ new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+ seg.append(largestOffset = 109L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+ shallowOffsetOfMaxTimestamp = 108L, records = MemoryRecords.withRecords(108L, CompressionType.NONE,
1,
+ new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+ seg.append(largestOffset = 111L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+ shallowOffsetOfMaxTimestamp = 110, records = MemoryRecords.withRecords(110L, CompressionType.NONE,
2,
+ new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+ seg.recover(new ProducerStateManager(topicPartition, logDir), Some(cache))
+ assertEquals(ArrayBuffer(EpochEntry(epoch = 0, startOffset = 104L),
+ EpochEntry(epoch = 1, startOffset = 106),
+ EpochEntry(epoch = 2, startOffset = 110)),
+ cache.epochEntries)
+ }
+
private def endTxnRecords(controlRecordType: ControlRecordType,
producerId: Long,
producerEpoch: Short,
|