kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-6003; Accept appends on replicas unconditionally when local producer state doesn't exist
Date Thu, 05 Oct 2017 05:28:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0 5792f2fb3 -> dc907d9b7


KAFKA-6003; Accept appends on replicas unconditionally when local producer state doesn't exist

Without this patch, if the replica's log was somehow truncated before
the leader's, it is possible for the replica fetcher thread to
continuously throw an OutOfOrderSequenceException because the
incoming sequence would be non-zero and there is no local state.

This patch changes the behavior so that the replica state is updated to
the leader's state if there was no local state for the producer at the
time of the append.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #4004 from apurvam/KAFKA-6003-handle-unknown-producer-on-replica

(cherry picked from commit 6ea4fffdd287a0c6a02c1b6dc1006b1a7b614405)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc907d9b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc907d9b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc907d9b

Branch: refs/heads/1.0
Commit: dc907d9b7b9b0ed719c228237640d5049bb8e483
Parents: 5792f2f
Author: Apurva Mehta <apurva@confluent.io>
Authored: Wed Oct 4 22:27:03 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Wed Oct 4 22:27:53 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |   8 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |   2 +-
 .../scala/kafka/log/ProducerStateManager.scala  | 101 ++++++++++++-------
 .../scala/unit/kafka/log/LogSegmentTest.scala   |   5 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  15 ++-
 .../kafka/log/ProducerStateManagerTest.scala    |  64 ++++++++++--
 6 files changed, 134 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dc907d9b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index dc47194..d397ca6 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -514,7 +514,7 @@ class Log(@volatile var dir: File,
     val completedTxns = ListBuffer.empty[CompletedTxn]
     records.batches.asScala.foreach { batch =>
       if (batch.hasProducerId) {
-        val maybeCompletedTxn = updateProducers(batch, loadedProducers, loadingFromLog =
true)
+        val maybeCompletedTxn = updateProducers(batch, loadedProducers, isFromClient = false)
         maybeCompletedTxn.foreach(completedTxns += _)
       }
     }
@@ -791,7 +791,7 @@ class Log(@volatile var dir: File,
           return (updatedProducers, completedTxns.toList, Some(duplicate))
         }
 
-      val maybeCompletedTxn = updateProducers(batch, updatedProducers, loadingFromLog = false)
+      val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient)
       maybeCompletedTxn.foreach(completedTxns += _)
     }
     (updatedProducers, completedTxns.toList, None)
@@ -878,9 +878,9 @@ class Log(@volatile var dir: File,
 
   private def updateProducers(batch: RecordBatch,
                               producers: mutable.Map[Long, ProducerAppendInfo],
-                              loadingFromLog: Boolean): Option[CompletedTxn] = {
+                              isFromClient: Boolean): Option[CompletedTxn] = {
     val producerId = batch.producerId
-    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId,
loadingFromLog))
+    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId,
isFromClient))
     appendInfo.append(batch)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc907d9b/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 06c4e2d..845f08f 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -145,7 +145,7 @@ class LogSegment(val log: FileRecords,
   private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch):
Unit = {
     if (batch.hasProducerId) {
       val producerId = batch.producerId
-      val appendInfo = producerStateManager.prepareUpdate(producerId, loadingFromLog = true)
+      val appendInfo = producerStateManager.prepareUpdate(producerId, isFromClient = false)
       val maybeCompletedTxn = appendInfo.append(batch)
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc907d9b/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 1cf9a14..81726c1 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -36,6 +36,15 @@ import scala.collection.{immutable, mutable}
 
 class CorruptSnapshotException(msg: String) extends KafkaException(msg)
 
+
+// ValidationType and its subtypes define the extent of the validation to perform on a given
ProducerAppendInfo instance
+private[log] sealed trait ValidationType
+private[log] object ValidationType {
+  case object None extends ValidationType
+  case object EpochOnly extends ValidationType
+  case object Full extends ValidationType
+}
+
 private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffsetMetadata,
var lastOffset: Option[Long] = None) {
   def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset))
 
@@ -138,49 +147,58 @@ private[log] class ProducerIdEntry(val producerId: Long, val batchMetadata:
muta
  *                      the most recent appends made by the producer. Validation of the first
incoming append will
  *                      be made against the lastest append in the current entry. New appends
will replace older appends
  *                      in the current entry so that the space overhead is constant.
- * @param validateSequenceNumbers Whether or not sequence numbers should be validated. The
only current use
- *                                of this is the consumer offsets topic which uses producer
ids from incoming
- *                                TxnOffsetCommit, but has no sequence number to validate
and does not depend
- *                                on the deduplication which sequence numbers provide.
- * @param loadingFromLog This parameter indicates whether the new append is being loaded
directly from the log.
- *                       This is used to repopulate producer state when the broker is initialized.
The only
- *                       difference in behavior is that we do not validate the sequence number
of the first append
- *                       since we may have lost previous sequence numbers when segments were
removed due to log
- *                       retention enforcement.
+ * @param validationType Indicates the extent of validation to perform on the appends on
this instance. Offset commits
+ *                       coming from the producer should have EpochOnlyValidation. Appends
which aren't from a client
+ *                       will not be validated at all, and should be set to NoValidation.
All other appends should
+ *                       have FullValidation.
  */
 private[log] class ProducerAppendInfo(val producerId: Long,
                                       currentEntry: ProducerIdEntry,
-                                      validateSequenceNumbers: Boolean,
-                                      loadingFromLog: Boolean) {
+                                      validationType: ValidationType) {
 
   private val transactions = ListBuffer.empty[TxnMetadata]
 
-  private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = {
+  private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = {
+    validationType match {
+      case ValidationType.None =>
+
+      case ValidationType.EpochOnly =>
+        checkEpoch(producerEpoch)
+
+      case ValidationType.Full =>
+        checkEpoch(producerEpoch)
+        checkSequence(producerEpoch, firstSeq, lastSeq)
+    }
+  }
+
+  private def checkEpoch(producerEpoch: Short): Unit = {
     if (isFenced(producerEpoch)) {
       throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably
another producer " +
         s"with a newer epoch. $producerEpoch (request epoch), ${currentEntry.producerEpoch}
(server epoch)")
-    } else if (validateSequenceNumbers) {
-      if (producerEpoch != currentEntry.producerEpoch) {
-        if (firstSeq != 0) {
-          if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
-            throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch:
$producerEpoch " +
-              s"(request epoch), $firstSeq (seq. number)")
-          } else {
-            throw new UnknownProducerIdException(s"Found no record of producerId=$producerId
on the broker. It is possible " +
-              s"that the last message with the producerId=$producerId has been removed due
to hitting the retention limit.")
-          }
+    }
+  }
+
+  private def checkSequence(producerEpoch: Short, firstSeq: Int, lastSeq: Int): Unit = {
+    if (producerEpoch != currentEntry.producerEpoch) {
+      if (firstSeq != 0) {
+        if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
+          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch:
$producerEpoch " +
+            s"(request epoch), $firstSeq (seq. number)")
+        } else {
+          throw new UnknownProducerIdException(s"Found no record of producerId=$producerId
on the broker. It is possible " +
+            s"that the last message with the producerId=$producerId has been removed due
to hitting the retention limit.")
         }
-      } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0)
{
-        // the epoch was bumped by a control record, so we expect the sequence number to
be reset
-        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId
$producerId: found $firstSeq " +
-          s"(incoming seq. number), but expected 0")
-      } else if (isDuplicate(firstSeq, lastSeq)) {
-        throw new DuplicateSequenceException(s"Duplicate sequence number for producerId $producerId:
(incomingBatch.firstSeq, " +
-          s"incomingBatch.lastSeq): ($firstSeq, $lastSeq).")
-      } else if (!inSequence(firstSeq, lastSeq)) {
-        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId
$producerId: $firstSeq " +
-          s"(incoming seq. number), ${currentEntry.lastSeq} (current end sequence number)")
       }
+    } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0)
{
+      // the epoch was bumped by a control record, so we expect the sequence number to be
reset
+      throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId
$producerId: found $firstSeq " +
+        s"(incoming seq. number), but expected 0")
+    } else if (isDuplicate(firstSeq, lastSeq)) {
+      throw new DuplicateSequenceException(s"Duplicate sequence number for producerId $producerId:
(incomingBatch.firstSeq, " +
+        s"incomingBatch.lastSeq): ($firstSeq, $lastSeq).")
+    } else if (!inSequence(firstSeq, lastSeq)) {
+      throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId
$producerId: $firstSeq " +
+        s"(incoming seq. number), ${currentEntry.lastSeq} (current end sequence number)")
     }
   }
 
@@ -216,10 +234,7 @@ private[log] class ProducerAppendInfo(val producerId: Long,
              lastTimestamp: Long,
              lastOffset: Long,
              isTransactional: Boolean): Unit = {
-    if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog)
-      // skip validation if this is the first entry when loading from the log. Log retention
-      // will generally have removed the beginning entries from each producer id
-      validateAppend(epoch, firstSeq, lastSeq)
+    maybeValidateAppend(epoch, firstSeq, lastSeq)
 
     currentEntry.addBatchMetadata(epoch, lastSeq, lastOffset, lastSeq - firstSeq, lastTimestamp)
 
@@ -541,9 +556,17 @@ class ProducerStateManager(val topicPartition: TopicPartition,
     }
   }
 
-  def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo =
-    new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.empty(producerId)),
validateSequenceNumbers,
-      loadingFromLog)
+  def prepareUpdate(producerId: Long, isFromClient: Boolean): ProducerAppendInfo = {
+    val validationToPerform =
+      if (!isFromClient)
+        ValidationType.None
+      else if (topicPartition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+        ValidationType.EpochOnly
+      else
+        ValidationType.Full
+
+    new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.empty(producerId)),
validationToPerform)
+  }
 
   /**
    * Update the mapping with the given append information

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc907d9b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 0f866e7..cef2bca 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -274,7 +274,7 @@ class LogSegmentTest {
     val segment = createSegment(100)
     val producerEpoch = 0.toShort
     val partitionLeaderEpoch = 15
-    val sequence = 0
+    val sequence = 100
 
     val pid1 = 5L
     val pid2 = 10L
@@ -317,7 +317,8 @@ class LogSegmentTest {
 
     // recover again, but this time assuming the transaction from pid2 began on a previous
segment
     stateManager = new ProducerStateManager(topicPartition, logDir)
-    stateManager.loadProducerEntry(new ProducerIdEntry(pid2, mutable.Queue[BatchMetadata](BatchMetadata(10,
90L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, 0, Some(75L)))
+    stateManager.loadProducerEntry(new ProducerIdEntry(pid2,
+      mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)),
producerEpoch, 0, Some(75L)))
     segment.recover(stateManager)
     assertEquals(108L, stateManager.mapEndOffset)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc907d9b/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2ae62c5..6d40967 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -869,7 +869,7 @@ class LogTest {
     }
   }
 
-  @Test(expected = classOf[DuplicateSequenceException])
+  @Test
   def testDuplicateAppendToFollower() : Unit = {
     val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
@@ -877,15 +877,20 @@ class LogTest {
     val pid = 1L
     val baseSequence = 0
     val partitionLeaderEpoch = 0
+    // The point of this test is to ensure that validation isn't performed on the follower.
     // this is a bit contrived. to trigger the duplicate case for a follower append, we have
to append
     // a batch with matching sequence numbers, but valid increasing offsets
+    assertEquals(0L, log.logEndOffset)
     log.appendAsFollower(MemoryRecords.withIdempotentRecords(0L, CompressionType.NONE, pid,
epoch, baseSequence,
       partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
     log.appendAsFollower(MemoryRecords.withIdempotentRecords(2L, CompressionType.NONE, pid,
epoch, baseSequence,
       partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    // Ensure that even the duplicate sequences are accepted on the follower.
+    assertEquals(4L, log.logEndOffset)
   }
 
-  @Test(expected = classOf[DuplicateSequenceException])
+  @Test
   def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = {
     val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
@@ -930,9 +935,11 @@ class LogTest {
 
     val records = MemoryRecords.readableRecords(buffer)
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
+
+    // Ensure that batches with duplicates are accepted on the follower.
+    assertEquals(0L, log.logEndOffset)
     log.appendAsFollower(records)
-    // Should throw a duplicate sequence exception here.
-    fail("should have thrown a DuplicateSequenceNumberException.")
+    assertEquals(5L, log.logEndOffset)
   }
 
   @Test(expected = classOf[ProducerFencedException])

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc907d9b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 9eb9ae7..8650624 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -86,7 +86,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
 
     append(stateManager, producerId, epoch, 0, offset + 500)
 
@@ -105,7 +105,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
     append(stateManager, producerId, epoch, 1, offset + 500)
   }
 
@@ -114,7 +114,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 5.toShort
     val sequence = 16
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
 
     val maybeLastEntry = stateManager.lastEntry(producerId)
     assertTrue(maybeLastEntry.isDefined)
@@ -159,8 +159,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId),
validateSequenceNumbers = true,
-      loadingFromLog = false)
+    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId),
ValidationType.Full)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional
= true)
 
     val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset
= 990000L,
@@ -176,8 +175,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId),
validateSequenceNumbers = true,
-      loadingFromLog = false)
+    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId),
ValidationType.Full)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional
= true)
 
     // use some other offset to simulate a follower append where the log offset metadata
won't typically
@@ -197,7 +195,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 9L
     append(stateManager, producerId, producerEpoch, 0, offset)
 
-    val appendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false)
+    val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
     appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true)
     var lastEntry = appendInfo.latestEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
@@ -321,6 +319,50 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   @Test
+  def testAcceptAppendWithoutProducerStateOnReplica(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, 0)
+    append(stateManager, producerId, epoch, 1, 1L, 1)
+
+    stateManager.takeSnapshot()
+    val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(0L, 1L, 70000)
+
+    val sequence = 2
+    // entry added after recovery. The pid should be expired now, and would not exist in
the pid mapping. Nonetheless
+    // the append on a replica should be accepted with the local producer state updated to
the appended value.
+    assertFalse(recoveredMapping.activeProducers.contains(producerId))
+    append(recoveredMapping, producerId, epoch, sequence, 2L, 70001, isFromClient = false)
+    assertTrue(recoveredMapping.activeProducers.contains(producerId))
+    val producerIdEntry = recoveredMapping.activeProducers.get(producerId).head
+    assertEquals(epoch, producerIdEntry.producerEpoch)
+    assertEquals(sequence, producerIdEntry.firstSeq)
+    assertEquals(sequence, producerIdEntry.lastSeq)
+  }
+
+  @Test
+  def testAcceptAppendWithSequenceGapsOnReplica(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, 0)
+    val outOfOrderSequence = 3
+
+    // First we ensure that we raise an OutOfOrderSequenceException is raised when the append
comes from a client.
+    try {
+      append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = true)
+      fail("Expected an OutOfOrderSequenceException to be raised.")
+    } catch {
+      case _ : OutOfOrderSequenceException =>
+      // Good!
+      case _ : Exception =>
+        fail("Expected an OutOfOrderSequenceException to be raised.")
+    }
+
+    assertEquals(0L, stateManager.activeProducers(producerId).lastSeq)
+    append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = false)
+    assertEquals(outOfOrderSequence, stateManager.activeProducers(producerId).lastSeq)
+  }
+
+  @Test
   def testDeleteSnapshotsBefore(): Unit = {
     val epoch = 0.toShort
     append(stateManager, producerId, epoch, 0, 0L)
@@ -675,7 +717,7 @@ class ProducerStateManagerTest extends JUnitSuite {
                                  offset: Long,
                                  coordinatorEpoch: Int = 0,
                                  timestamp: Long = time.milliseconds()): (CompletedTxn, Long)
= {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false)
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
     val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch,
offset, timestamp)
     mapping.update(producerAppendInfo)
@@ -691,8 +733,8 @@ class ProducerStateManagerTest extends JUnitSuite {
                      offset: Long,
                      timestamp: Long = time.milliseconds(),
                      isTransactional: Boolean = false,
-                     isLoadingFromLog: Boolean = false): Unit = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, isLoadingFromLog)
+                     isFromClient : Boolean = true): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
     producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, isTransactional)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)


Mime
View raw message