kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8335; Clean empty batches when sequence numbers are reused (#6715)
Date Mon, 13 May 2019 15:48:07 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d798dbf  KAFKA-8335; Clean empty batches when sequence numbers are reused (#6715)
d798dbf is described below

commit d798dbf497d91509517c60d67ceeea02e2b9b383
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Mon May 13 08:47:53 2019 -0700

    KAFKA-8335; Clean empty batches when sequence numbers are reused (#6715)
    
    The log cleaner attempts to preserve the last entry for each producerId in order to ensure
that sequence/epoch state is not lost. The current validation checks only the last sequence
number for each producerId in order to decide whether a batch should be retained. There are
two problems with this:
    
    1. Sequence numbers are not unique alone. It is the tuple of sequence number and epoch
which is uniquely defined.
    2. The group coordinator always writes batches beginning with sequence number 0, which
means there could be many batches which have the same sequence number.
    
    The complete fix for the second issue would probably add proper sequence number bookkeeping
in the coordinator. For now, we have left the coordinator implementation unchanged and changed
the cleaner logic to use the last offset written by a producer instead of the last sequence
number.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            |  8 ++++
 core/src/main/scala/kafka/log/LogCleaner.scala     | 27 ++++++++---
 .../scala/kafka/log/ProducerStateManager.scala     |  6 +++
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 53 ++++++++++++++++++++--
 4 files changed, 84 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 4e1e3c7..149a4f0 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -733,6 +733,14 @@ class Log(@volatile var dir: File,
     }
   }
 
+  private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized
{
+    producerStateManager.activeProducers.map { case (producerId, producerIdEntry) =>
+      val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset)
else None
+      val lastRecord = LastRecord(lastDataOffset, producerIdEntry.producerEpoch)
+      producerId -> lastRecord
+    }
+  }
+
   /**
    * Check if we have the "clean shutdown" file
    */
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index d172920..9c8010c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -540,6 +540,8 @@ private[log] class Cleaner(val id: Int,
       // clean segments into the new destination segment
       val iter = segments.iterator
       var currentSegmentOpt: Option[LogSegment] = Some(iter.next())
+      val lastOffsetOfActiveProducers = log.lastRecordsOfActiveProducers
+
       while (currentSegmentOpt.isDefined) {
         val currentSegment = currentSegmentOpt.get
         val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None
@@ -555,7 +557,7 @@ private[log] class Cleaner(val id: Int,
 
         try {
           cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletes,
log.config.maxMessageSize,
-            transactionMetadata, log.activeProducersWithLastSequence, stats)
+            transactionMetadata, lastOffsetOfActiveProducers, stats)
         } catch {
           case e: LogSegmentOffsetOverflowException =>
             // Split the current segment. It's also safest to abort the current cleaning
process, so that we retry from
@@ -607,9 +609,9 @@ private[log] class Cleaner(val id: Int,
                              retainDeletes: Boolean,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
-                             activeProducers: Map[Long, Int],
+                             lastRecordsOfActiveProducers: Map[Long, LastRecord],
                              stats: CleanerStats) {
-    val logCleanerFilter = new RecordFilter {
+    val logCleanerFilter: RecordFilter = new RecordFilter {
       var discardBatchRecords: Boolean = _
 
       override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
@@ -617,9 +619,22 @@ private[log] class Cleaner(val id: Int,
         // note that we will never delete a marker until all the records from that transaction
are removed.
         discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers
= retainDeletes)
 
-        // check if the batch contains the last sequence number for the producer. if so,
we cannot
-        // remove the batch just yet or the producer may see an out of sequence error.
-        if (batch.hasProducerId && activeProducers.get(batch.producerId).contains(batch.lastSequence))
+        def isBatchLastRecordOfProducer: Boolean = {
+          // We retain the batch in order to preserve the state of active producers. There
are three cases:
+          // 1) The producer is no longer active, which means we can delete all records for
that producer.
+          // 2) The producer is still active and has a last data offset. We retain the batch
that contains
+          //    this offset since it also contains the last sequence number for this producer.
+          // 3) The last entry in the log is a transaction marker. We retain this marker
since it has the
+          //    last producer epoch, which is needed to ensure fencing.
+          lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord =>
+            lastRecord.lastDataOffset match {
+              case Some(offset) => batch.lastOffset == offset
+              case None => batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch
+            }
+          }
+        }
+
+        if (batch.hasProducerId && isBatchLastRecordOfProducer)
           BatchRetention.RETAIN_EMPTY
         else if (discardBatchRecords)
           BatchRetention.DELETE
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 3db436e..5632b7b 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -37,6 +37,12 @@ import scala.collection.{immutable, mutable}
 
 class CorruptSnapshotException(msg: String) extends KafkaException(msg)
 
+/**
+ * The last written record for a given producer. The last data offset may be undefined
+ * if the only log entry for a producer is a transaction marker.
+ */
+case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short)
+
 
 // ValidationType and its subtypes define the extent of the validation to perform on a given
ProducerAppendInfo instance
 private[log] sealed trait ValidationType
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 321800a..00c9a18 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -527,6 +527,45 @@ class LogCleanerTest {
   }
 
   @Test
+  def testEmptyBatchRemovalWithSequenceReuse(): Unit = {
+    // The group coordinator always writes batches beginning with sequence number 0. This
test
+    // ensures that we still remove old empty batches and transaction markers under this
expectation.
+
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 2048: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
isFromClient = false)
+    appendFirstTransaction(Seq(1))
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient
= false)
+
+    val appendSecondTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
isFromClient = false)
+    appendSecondTransaction(Seq(2))
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient
= false)
+
+    log.appendAsLeader(record(1, 1), leaderEpoch = 0)
+    log.appendAsLeader(record(2, 1), leaderEpoch = 0)
+
+    // Roll the log to ensure that the data is cleanable.
+    log.roll()
+
+    // Both transactional batches will be cleaned. The last one will remain in the log
+    // as an empty batch in order to preserve the producer sequence number and epoch
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(1, 3, 4, 5), offsetsInLog(log))
+    assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
+
+    // On the second round of cleaning, the marker from the first transaction should be removed.
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(3, 4, 5), offsetsInLog(log))
+    assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
+  }
+
+  @Test
   def testAbortMarkerRetentionWithEmptyBatch(): Unit = {
     val tp = new TopicPartition("test", 0)
     val cleaner = makeCleaner(Int.MaxValue)
@@ -1521,13 +1560,19 @@ class LogCleanerTest {
       partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
   }
 
-  private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short):
Seq[Int] => LogAppendInfo = {
-    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true)
+  private def appendTransactionalAsLeader(log: Log,
+                                          producerId: Long,
+                                          producerEpoch: Short,
+                                          leaderEpoch: Int = 0,
+                                          isFromClient: Boolean = true): Seq[Int] => LogAppendInfo
= {
+    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, isFromClient
= isFromClient)
   }
 
   private def appendIdempotentAsLeader(log: Log, producerId: Long,
                                        producerEpoch: Short,
-                                       isTransactional: Boolean = false): Seq[Int] =>
LogAppendInfo = {
+                                       isTransactional: Boolean = false,
+                                       leaderEpoch: Int = 0,
+                                       isFromClient: Boolean = true): Seq[Int] => LogAppendInfo
= {
     var sequence = 0
     keys: Seq[Int] => {
       val simpleRecords = keys.map { key =>
@@ -1539,7 +1584,7 @@ class LogCleanerTest {
       else
         MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch,
sequence, simpleRecords: _*)
       sequence += simpleRecords.size
-      log.appendAsLeader(records, leaderEpoch = 0)
+      log.appendAsLeader(records, leaderEpoch, isFromClient)
     }
   }
 


Mime
View raw message