kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8351; Cleaner should handle transactions spanning multiple segments (#6722)
Date Sat, 25 May 2019 06:22:42 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 2ae66eb  KAFKA-8351; Cleaner should handle transactions spanning multiple segments
(#6722)
2ae66eb is described below

commit 2ae66eba6a311d191709cb2234561bd810c971c5
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Fri May 24 23:10:56 2019 -0700

    KAFKA-8351; Cleaner should handle transactions spanning multiple segments (#6722)
    
    When cleaning transactional data, we need to keep track of which transactions still have
data associated with them so that we do not remove the markers. We had logic to do this, but
the state was not being carried over when beginning cleaning for a new set of segments. This
could cause the cleaner to incorrectly believe a transaction marker was no longer needed.
The fix here carries the transactional state between groups of segments to be cleaned.
    
    Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Viktor Somogyi <viktorsomogyi@gmail.com>,
Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  70 ++++++------
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 117 ++++++++++++++++-----
 2 files changed, 129 insertions(+), 58 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4024741..7bc03c4 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -321,7 +321,7 @@ class LogCleaner(initialConfig: CleanerConfig,
         }
         val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
         try {
-          deletable.foreach { case (topicPartition, log) =>
+          deletable.foreach { case (_, log) =>
             currentLog = Some(log)
             log.deleteOldSegments()
           }
@@ -508,8 +508,12 @@ private[log] class Cleaner(val id: Int,
 
     // group the segments and clean the groups
     info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name,
new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
-    for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
log.config.maxIndexSize, cleanable.firstUncleanableOffset))
-      cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)
+    val transactionMetadata = new CleanedTransactionMetadata
+
+    val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
+      log.config.maxIndexSize, cleanable.firstUncleanableOffset)
+    for (group <- groupedSegments)
+      cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata)
 
     // record buffer utilization
     stats.bufferUtilization = offsetMap.utilization
@@ -527,14 +531,18 @@ private[log] class Cleaner(val id: Int,
    * @param map The offset map to use for cleaning segments
    * @param deleteHorizonMs The time to retain delete tombstones
    * @param stats Collector for cleaning statistics
+   * @param transactionMetadata State of ongoing transactions which is carried between the
cleaning
+   *                            of the grouped segments
    */
   private[log] def cleanSegments(log: Log,
                                  segments: Seq[LogSegment],
                                  map: OffsetMap,
                                  deleteHorizonMs: Long,
-                                 stats: CleanerStats) {
+                                 stats: CleanerStats,
+                                 transactionMetadata: CleanedTransactionMetadata): Unit =
{
     // create a new segment with a suffix appended to the name of the log and indexes
     val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset)
+    transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
 
     try {
       // clean segments into the new destination segment
@@ -549,7 +557,7 @@ private[log] class Cleaner(val id: Int,
         val startOffset = currentSegment.baseOffset
         val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset
+ 1)
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
-        val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(cleaned.txnIndex))
+        transactionMetadata.addAbortedTransactions(abortedTransactions)
 
         val retainDeletes = currentSegment.lastModified > deleteHorizonMs
         info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new
Date(currentSegment.largestTimestamp)}) " +
@@ -857,8 +865,9 @@ private[log] class Cleaner(val id: Int,
     val dirty = log.logSegments(start, end).toBuffer
     info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name,
dirty.size, start, end))
 
+    val transactionMetadata = new CleanedTransactionMetadata
     val abortedTransactions = log.collectAbortedTransactions(start, end)
-    val transactionMetadata = CleanedTransactionMetadata(abortedTransactions)
+    transactionMetadata.addAbortedTransactions(abortedTransactions)
 
     // Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
     // but we may be able to fit more (if there is lots of duplication in the dirty section
of the log)
@@ -1008,29 +1017,26 @@ private case class LogToClean(topicPartition: TopicPartition, log:
Log, firstDir
   override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
 }
 
-private[log] object CleanedTransactionMetadata {
-  def apply(abortedTransactions: List[AbortedTxn],
-            transactionIndex: Option[TransactionIndex] = None): CleanedTransactionMetadata
= {
-    val queue = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
-      override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
-    }.reverse)
-    queue ++= abortedTransactions
-    new CleanedTransactionMetadata(queue, transactionIndex)
-  }
-
-  val Empty = CleanedTransactionMetadata(List.empty[AbortedTxn])
-}
-
 /**
- * This is a helper class to facilitate tracking transaction state while cleaning the log.
It is initialized
- * with the aborted transactions from the transaction index and its state is updated as the
cleaner iterates through
- * the log during a round of cleaning. This class is responsible for deciding when transaction
markers can
- * be removed and is therefore also responsible for updating the cleaned transaction index
accordingly.
+ * This is a helper class to facilitate tracking transaction state while cleaning the log.
It maintains a set
+ * of the ongoing aborted and committed transactions as the cleaner is working its way through
the log. This
+ * class is responsible for deciding when transaction markers can be removed and is therefore
also responsible
+ * for updating the cleaned transaction index accordingly.
  */
-private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.PriorityQueue[AbortedTxn],
-                                              val transactionIndex: Option[TransactionIndex]
= None) {
-  val ongoingCommittedTxns = mutable.Set.empty[Long]
-  val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
+private[log] class CleanedTransactionMetadata {
+  private val ongoingCommittedTxns = mutable.Set.empty[Long]
+  private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
+  // Minheap of aborted transactions sorted by the transaction first offset
+  private var abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn]
{
+    override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
+  }.reverse)
+
+  // Output cleaned index to write retained aborted transactions
+  var cleanedIndex: Option[TransactionIndex] = None
+
+  def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = {
+    this.abortedTransactions ++= abortedTransactions
+  }
 
   /**
    * Update the cleaned transaction state with a control batch that has just been traversed
by the cleaner.
@@ -1047,9 +1053,11 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions:
mutable.P
       controlType match {
         case ControlRecordType.ABORT =>
           ongoingAbortedTxns.remove(producerId) match {
-            // Retain the marker until all batches from the transaction have been removed
+            // Retain the marker until all batches from the transaction have been removed.
+            // We may retain a record from an aborted transaction if it is the last entry
+            // written by a given producerId.
             case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined
=>
-              transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
+              cleanedIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
               false
             case _ => true
           }
@@ -1069,7 +1077,7 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions:
mutable.P
   private def consumeAbortedTxnsUpTo(offset: Long): Unit = {
     while (abortedTransactions.headOption.exists(_.firstOffset <= offset)) {
       val abortedTxn = abortedTransactions.dequeue()
-      ongoingAbortedTxns += abortedTxn.producerId -> new AbortedTransactionMetadata(abortedTxn)
+      ongoingAbortedTxns.getOrElseUpdate(abortedTxn.producerId, new AbortedTransactionMetadata(abortedTxn))
     }
   }
 
@@ -1097,4 +1105,6 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions:
mutable.P
 
 private class AbortedTransactionMetadata(val abortedTxn: AbortedTxn) {
   var lastObservedBatchOffset: Option[Long] = None
+
+  override def toString: String = s"(txn: $abortedTxn, lastOffset: $lastObservedBatchOffset)"
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 0dcfeec..17b5a24 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -84,7 +84,7 @@ class LogCleanerTest extends JUnitSuite {
     val segments = log.logSegments.take(3).toSeq
     val stats = new CleanerStats()
     val expectedBytesRead = segments.map(_.size).sum
-    cleaner.cleanSegments(log, segments, map, 0L, stats)
+    cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata)
     val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_))
     assertEquals(shouldRemain, LogTest.keysInLog(log))
     assertEquals(expectedBytesRead, stats.bytesRead)
@@ -151,7 +151,7 @@ class LogCleanerTest extends JUnitSuite {
     val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
     val stats = new CleanerStats()
     cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats)
-    cleaner.cleanSegments(log, segments, offsetMap, 0L, stats)
+    cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata)
 
     // Validate based on the file name that log segment file is renamed exactly once for
async deletion
     assertEquals(expectedFileName, firstLogFile.file().getPath)
@@ -432,37 +432,37 @@ class LogCleanerTest extends JUnitSuite {
     log.roll()
 
     // first time through the records are removed
-    // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit},
{2}, {3}]
+    // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit},
{2}, {3}, {Producer1: Commit}]
     var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3), LogTest.keysInLog(log))
-    assertEquals(List(4, 5, 6), offsetsInLog(log))
-    assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
+    assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
 
     // the empty batch remains if cleaned again because it still holds the last sequence
-    // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit},
{2}, {3}]
+    // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit},
{2}, {3}, {Producer1: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
     assertEquals(List(2, 3), LogTest.keysInLog(log))
-    assertEquals(List(4, 5, 6), offsetsInLog(log))
-    assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
+    assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
 
     // append a new record from the producer to allow cleaning of the empty batch
-    // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}]
[{Producer2: 1}, {Producer2: Commit}]
-    //  {1},                     {3},                     {4},                 {5}, {6},
 {8},            {9} ==> Offsets
+    // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3},
{Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
+    //  {1},                     {3},                     {4},                 {5}, {6},
{7},                 {8},            {9} ==> Offsets
     producer2(Seq(1)) // offset 8
     log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false)
// offset 9
     log.roll()
 
-    // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer2:
1}, {Producer2: Commit}]
+    // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1:
Commit}, {Producer2: 1}, {Producer2: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
     assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
-    assertEquals(List(4, 5, 6, 8, 9), offsetsInLog(log))
-    assertEquals(List(1, 4, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
+    assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
 
-    // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}]
+    // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2:
1}, {Producer2: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
     assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
-    assertEquals(List(5, 6, 8, 9), offsetsInLog(log))
-    assertEquals(List(1, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
+    assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
   }
 
   @Test
@@ -497,6 +497,59 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testCommittedTransactionSpanningSegments(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 128: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+
+    val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch)
+    appendTransaction(Seq(1))
+    log.roll()
+
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient
= false)
+    log.roll()
+
+    // Both the record and the marker should remain after cleaning
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(0, 1), offsetsInLog(log))
+    assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
+  }
+
+  @Test
+  def testAbortedTransactionSpanningSegments(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 128: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+
+    val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch)
+    appendTransaction(Seq(1))
+    log.roll()
+
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient
= false)
+    log.roll()
+
+    // Both the batch and the marker should remain after cleaning. The batch is retained
+    // because it is the last entry for this producerId. The marker is retained because
+    // there are still batches remaining from this transaction.
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(1), offsetsInLog(log))
+    assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
+
+    // The empty batch and the marker is still retained after a second cleaning.
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(1), offsetsInLog(log))
+    assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
+  }
+
+  @Test
   def testAbortMarkerRemoval(): Unit = {
     val tp = new TopicPartition("test", 0)
     val cleaner = makeCleaner(Int.MaxValue)
@@ -650,7 +703,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // clean the log
     val stats = new CleanerStats()
-    cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats)
+    cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata)
     val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_))
     assertEquals(shouldRemain, LogTest.keysInLog(log))
   }
@@ -663,7 +716,7 @@ class LogCleanerTest extends JUnitSuite {
     val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024
* 1024)
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
-    cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats)
+    cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats,
new CleanedTransactionMetadata)
     val shouldRemain = LogTest.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString))
     assertEquals(shouldRemain, LogTest.keysInLog(log))
   }
@@ -682,7 +735,7 @@ class LogCleanerTest extends JUnitSuite {
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
     intercept[CorruptRecordException] {
-      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats)
+      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats,
new CleanedTransactionMetadata)
     }
   }
 
@@ -699,7 +752,7 @@ class LogCleanerTest extends JUnitSuite {
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
     intercept[CorruptRecordException] {
-      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats)
+      cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats,
new CleanedTransactionMetadata)
     }
   }
 
@@ -1030,7 +1083,8 @@ class LogCleanerTest extends JUnitSuite {
     val map = new FakeOffsetMap(Int.MaxValue)
     keys.foreach(k => map.put(key(k), Long.MaxValue))
     intercept[LogCleaningAbortedException] {
-      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats())
+      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats(),
+        new CleanedTransactionMetadata)
     }
   }
 
@@ -1239,7 +1293,8 @@ class LogCleanerTest extends JUnitSuite {
 
     // Try to clean segment with offset overflow. This will trigger log split and the cleaning
itself must abort.
     assertThrows[LogCleaningAbortedException] {
-      cleaner.cleanSegments(log, List(segmentWithOverflow), offsetMap, 0L, new CleanerStats())
+      cleaner.cleanSegments(log, List(segmentWithOverflow), offsetMap, 0L, new CleanerStats(),
+        new CleanedTransactionMetadata)
     }
     assertEquals(numSegmentsInitial + 1, log.logSegments.size)
     assertEquals(allKeys, LogTest.keysInLog(log))
@@ -1247,7 +1302,8 @@ class LogCleanerTest extends JUnitSuite {
 
     // Clean each segment now that split is complete.
     for (segmentToClean <- log.logSegments)
-      cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats())
+      cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats(),
+        new CleanedTransactionMetadata)
     assertEquals(expectedKeysAfterCleaning, LogTest.keysInLog(log))
     assertFalse(LogTest.hasOffsetOverflow(log))
     log.close()
@@ -1287,7 +1343,8 @@ class LogCleanerTest extends JUnitSuite {
       offsetMap.put(key(k), Long.MaxValue)
 
     // clean the log
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
+      new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     var cleanedKeys = LogTest.keysInLog(log)
@@ -1302,7 +1359,8 @@ class LogCleanerTest extends JUnitSuite {
     log = recoverAndCheck(config, allKeys)
 
     // clean again
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
+      new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTest.keysInLog(log)
@@ -1323,7 +1381,8 @@ class LogCleanerTest extends JUnitSuite {
     }
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
+      new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTest.keysInLog(log)
@@ -1340,7 +1399,8 @@ class LogCleanerTest extends JUnitSuite {
     }
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
+      new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTest.keysInLog(log)
@@ -1568,7 +1628,8 @@ class LogCleanerTest extends JUnitSuite {
     appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, isFromClient
= isFromClient)
   }
 
-  private def appendIdempotentAsLeader(log: Log, producerId: Long,
+  private def appendIdempotentAsLeader(log: Log,
+                                       producerId: Long,
                                        producerEpoch: Short,
                                        isTransactional: Boolean = false,
                                        leaderEpoch: Int = 0,


Mime
View raw message