kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9144; Track timestamp from txn markers to prevent early producer expiration (#7687)
Date Thu, 09 Jan 2020 21:59:15 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new b13a7ff  KAFKA-9144; Track timestamp from txn markers to prevent early producer expiration (#7687)
b13a7ff is described below

commit b13a7ff7077f05b9dd21a3cd21fed6d0da5b8cdd
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Jan 9 13:47:26 2020 -0800

    KAFKA-9144; Track timestamp from txn markers to prevent early producer expiration (#7687)
    
    Existing producer state expiration uses timestamps from data records only and not from transaction markers. This can cause premature producer expiration when the coordinator times out a transaction because we drop the state from existing batches. This in turn can allow the coordinator epoch to revert to a previous value, which can lead to validation failures during log recovery. This patch fixes the problem by also leveraging the timestamp from transaction markers.
    
    We also change the validation logic so that coordinator epoch is verified only for new marker appends. When replicating from the leader and when recovering the log, we only log a warning if we notice that the coordinator epoch has gone backwards. This allows recovery from previous occurrences of this bug.
    
    Finally, this patch fixes one minor issue when loading producer state from the snapshot file. When the only record for a given producer is a control record, the "last offset" field will be set to -1 in the snapshot. We should check for this case when loading to be sure we recover the state consistently.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/common/record/DefaultRecordBatch.java    |  16 +--
 .../common/record/DefaultRecordBatchTest.java      |   6 +
 core/src/main/scala/kafka/cluster/Partition.scala  |   4 +-
 .../coordinator/group/GroupMetadataManager.scala   |   5 +-
 .../transaction/TransactionStateManager.scala      |  12 +-
 core/src/main/scala/kafka/log/Log.scala            |  48 ++++---
 core/src/main/scala/kafka/log/LogSegment.scala     |   2 +-
 core/src/main/scala/kafka/log/LogValidator.scala   | 101 +++++++++-----
 .../scala/kafka/log/ProducerStateManager.scala     | 154 ++++++++++-----------
 core/src/main/scala/kafka/server/KafkaApis.scala   |   5 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   8 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  19 ++-
 .../AbstractCoordinatorConcurrencyTest.scala       |   4 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |   7 +-
 .../group/GroupMetadataManagerTest.scala           |  24 ++--
 .../transaction/TransactionStateManagerTest.scala  |  15 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     |   3 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  64 +++++----
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |   3 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 117 +++++++++++-----
 .../scala/unit/kafka/log/LogValidatorTest.scala    |  80 +++++------
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 140 +++++++++++++------
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   7 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  16 ++-
 24 files changed, 509 insertions(+), 351 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 6d79b26..d4a9587 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -534,16 +534,16 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
     }
 
-    public static int incrementSequence(int baseSequence, int increment) {
-        if (baseSequence > Integer.MAX_VALUE - increment)
-            return increment - (Integer.MAX_VALUE - baseSequence) - 1;
-        return baseSequence + increment;
+    public static int incrementSequence(int sequence, int increment) {
+        if (sequence > Integer.MAX_VALUE - increment)
+            return increment - (Integer.MAX_VALUE - sequence) - 1;
+        return sequence + increment;
     }
 
-    public static int decrementSequence(int baseSequence, int decrement) {
-        if (baseSequence < decrement)
-            return Integer.MAX_VALUE - (decrement - baseSequence) + 1;
-        return baseSequence - decrement;
+    public static int decrementSequence(int sequence, int decrement) {
+        if (sequence < decrement)
+            return Integer.MAX_VALUE - (decrement - sequence) + 1;
+        return sequence - decrement;
     }
 
     private abstract class RecordIterator implements CloseableIterator<Record> {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index beee10a..2d42c03 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -410,6 +410,12 @@ public class DefaultRecordBatchTest {
         assertEquals(4, DefaultRecordBatch.incrementSequence(Integer.MAX_VALUE - 5, 10));
     }
 
+    @Test
+    public void testDecrementSequence() {
+        assertEquals(0, DefaultRecordBatch.decrementSequence(5, 5));
+        assertEquals(Integer.MAX_VALUE, DefaultRecordBatch.decrementSequence(0, 1));
+    }
+
     private static DefaultRecordBatch recordsWithInvalidRecordCount(Byte magicValue, long timestamp,
                                               CompressionType codec, int invalidCount) {
         ByteBuffer buf = ByteBuffer.allocate(512);
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index bbbf0ca..a8796eb 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -938,7 +938,7 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
+  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
       leaderLogIfLocal match {
         case Some(leaderLog) =>
@@ -951,7 +951,7 @@ class Partition(val topicPartition: TopicPartition,
               s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
           }
 
-          val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient,
+          val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
             interBrokerProtocolVersion)
 
           // we may need to increment high watermark since ISR could be down to 1
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index c25e0cf..ce04269 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_3_IV0}
 import kafka.common.{MessageFormatter, OffsetAndMetadata}
+import kafka.log.AppendOrigin
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{FetchLogEnd, ReplicaManager}
 import kafka.utils.CoreUtils.inLock
@@ -315,7 +316,7 @@ class GroupMetadataManager(brokerId: Int,
       timeout = config.offsetCommitTimeoutMs.toLong,
       requiredAcks = config.offsetCommitRequiredAcks,
       internalTopicsAllowed = true,
-      isFromClient = false,
+      origin = AppendOrigin.Coordinator,
       entriesPerPartition = records,
       delayedProduceLock = Some(group.lock),
       responseCallback = callback)
@@ -830,7 +831,7 @@ class GroupMetadataManager(brokerId: Int,
                 // do not need to require acks since even if the tombstone is lost,
                 // it will be appended again in the next purge cycle
                 val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones.toArray: _*)
-                partition.appendRecordsToLeader(records, isFromClient = false, requiredAcks = 0)
+                partition.appendRecordsToLeader(records, origin = AppendOrigin.Coordinator, requiredAcks = 0)
 
                 offsetsRemoved += removedOffsets.size
                 trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 2e8bf7f..7d731a1 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -22,24 +22,24 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.log.LogConfig
+import kafka.log.{AppendOrigin, LogConfig}
 import kafka.message.UncompressedCodec
 import kafka.server.{Defaults, FetchLogEnd, ReplicaManager}
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.{Logging, Pool, Scheduler}
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.metrics.stats.{Avg, Max}
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.stats.{Avg, Max}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 
-import scala.collection.mutable
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 
 object TransactionStateManager {
@@ -211,7 +211,7 @@ class TransactionStateManager(brokerId: Int,
           config.requestTimeoutMs,
           TransactionLog.EnforcedRequiredAcks,
           internalTopicsAllowed = true,
-          isFromClient = false,
+          origin = AppendOrigin.Coordinator,
           recordsPerPartition,
           removeFromCacheCallback,
           Some(stateLock.readLock)
@@ -638,7 +638,7 @@ class TransactionStateManager(brokerId: Int,
                 newMetadata.txnTimeoutMs.toLong,
                 TransactionLog.EnforcedRequiredAcks,
                 internalTopicsAllowed = true,
-                isFromClient = false,
+                origin = AppendOrigin.Coordinator,
                 recordsPerPartition,
                 updateCacheCallback,
                 delayedProduceLock = Some(stateLock.readLock))
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 42697c7..c2cd2e2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -902,7 +902,7 @@ class Log(@volatile var dir: File,
         val maybeCompletedTxn = updateProducers(batch,
           loadedProducers,
           firstOffsetMetadata = None,
-          isFromClient = false)
+          origin = AppendOrigin.Replication)
         maybeCompletedTxn.foreach(completedTxns += _)
       }
     }
@@ -991,14 +991,16 @@ class Log(@volatile var dir: File,
    * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs
    *
    * @param records The records to append
-   * @param isFromClient Whether or not this append is from a producer
+   * @param origin Declares the origin of the append which affects required validations
    * @param interBrokerProtocolVersion Inter-broker message protocol version
    * @throws KafkaStorageException If the append fails due to an I/O error.
    * @return Information about the appended messages including the first and last offset.
    */
-  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
+  def appendAsLeader(records: MemoryRecords,
+                     leaderEpoch: Int,
+                     origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
+    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
   }
 
   /**
@@ -1009,7 +1011,11 @@ class Log(@volatile var dir: File,
    * @return Information about the appended messages including the first and last offset.
    */
   def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
-    append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
+    append(records,
+      origin = AppendOrigin.Replication,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      assignOffsets = false,
+      leaderEpoch = -1)
   }
 
   /**
@@ -1019,7 +1025,7 @@ class Log(@volatile var dir: File,
    * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
    *
    * @param records The log records to append
-   * @param isFromClient Whether or not this append is from a producer
+   * @param origin Declares the origin of the append which affects required validations
    * @param interBrokerProtocolVersion Inter-broker message protocol version
    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
    * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
@@ -1028,9 +1034,13 @@ class Log(@volatile var dir: File,
    * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset
    * @return Information about the appended messages including the first and last offset.
    */
-  private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
+  private def append(records: MemoryRecords,
+                     origin: AppendOrigin,
+                     interBrokerProtocolVersion: ApiVersion,
+                     assignOffsets: Boolean,
+                     leaderEpoch: Int): LogAppendInfo = {
     maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
-      val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
+      val appendInfo = analyzeAndValidateRecords(records, origin)
 
       // return if we have no valid messages or if this is a duplicate of the last appended entry
       if (appendInfo.shallowCount == 0)
@@ -1060,7 +1070,7 @@ class Log(@volatile var dir: File,
               config.messageTimestampType,
               config.messageTimestampDifferenceMaxMs,
               leaderEpoch,
-              isFromClient,
+              origin,
               interBrokerProtocolVersion,
               brokerTopicStats)
           } catch {
@@ -1146,7 +1156,7 @@ class Log(@volatile var dir: File,
         // now that we have valid records, offsets assigned, and timestamps updated, we need to
         // validate the idempotent/transactional state of the producers and collect some metadata
         val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
-          logOffsetMetadata, validRecords, isFromClient)
+          logOffsetMetadata, validRecords, origin)
 
         maybeDuplicate.foreach { duplicate =>
           appendInfo.firstOffset = Some(duplicate.firstOffset)
@@ -1263,7 +1273,7 @@ class Log(@volatile var dir: File,
 
   private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
                                               records: MemoryRecords,
-                                              isFromClient: Boolean):
+                                              origin: AppendOrigin):
   (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
     val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
     val completedTxns = ListBuffer.empty[CompletedTxn]
@@ -1275,7 +1285,7 @@ class Log(@volatile var dir: File,
 
         // if this is a client produce request, there will be up to 5 batches which could have been duplicated.
         // If we find a duplicate, we return the metadata of the appended batch to the client.
-        if (isFromClient) {
+        if (origin == AppendOrigin.Client) {
           maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
             return (updatedProducers, completedTxns.toList, Some(duplicate))
           }
@@ -1288,11 +1298,7 @@ class Log(@volatile var dir: File,
         else
           None
 
-        val maybeCompletedTxn = updateProducers(batch,
-          updatedProducers,
-          firstOffsetMetadata = firstOffsetMetadata,
-          isFromClient = isFromClient)
-
+        val maybeCompletedTxn = updateProducers(batch, updatedProducers, firstOffsetMetadata, origin)
         maybeCompletedTxn.foreach(completedTxns += _)
       }
 
@@ -1319,7 +1325,7 @@ class Log(@volatile var dir: File,
    * <li> Whether any compression codec is used (if many are used, then the last one is given)
    * </ol>
    */
-  private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
+  private def analyzeAndValidateRecords(records: MemoryRecords, origin: AppendOrigin): LogAppendInfo = {
     var shallowMessageCount = 0
     var validBytesCount = 0
     var firstOffset: Option[Long] = None
@@ -1333,7 +1339,7 @@ class Log(@volatile var dir: File,
 
     for (batch <- records.batches.asScala) {
       // we only validate V2 and higher to avoid potential compatibility issues with older clients
-      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
+      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == AppendOrigin.Client && batch.baseOffset != 0)
         throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
           s"be 0, but it is ${batch.baseOffset}")
 
@@ -1394,9 +1400,9 @@ class Log(@volatile var dir: File,
   private def updateProducers(batch: RecordBatch,
                               producers: mutable.Map[Long, ProducerAppendInfo],
                               firstOffsetMetadata: Option[LogOffsetMetadata],
-                              isFromClient: Boolean): Option[CompletedTxn] = {
+                              origin: AppendOrigin): Option[CompletedTxn] = {
     val producerId = batch.producerId
-    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, isFromClient))
+    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
     appendInfo.append(batch, firstOffsetMetadata)
   }
 
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index a54c2c2..64e96e1 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -245,7 +245,7 @@ class LogSegment private[log] (val log: FileRecords,
   private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = {
     if (batch.hasProducerId) {
       val producerId = batch.producerId
-      val appendInfo = producerStateManager.prepareUpdate(producerId, isFromClient = false)
+      val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.Replication)
       val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None)
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index c4dda08..b72ab1f 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -33,7 +33,32 @@ import org.apache.kafka.common.utils.Time
 import scala.collection.{Seq, mutable}
 import scala.collection.JavaConverters._
 
-private[kafka] object LogValidator extends Logging {
+/**
+ * The source of an append to the log. This is used when determining required validations.
+ */
+private[kafka] sealed trait AppendOrigin
+private[kafka] object AppendOrigin {
+
+  /**
+   * The log append came through replication from the leader. This typically implies minimal validation.
+   * Particularly, we do not decompress record batches in order to validate records individually.
+   */
+  case object Replication extends AppendOrigin
+
+  /**
+   * The log append came from either the group coordinator or the transaction coordinator. We validate
+   * producer epochs for normal log entries (specifically offset commits from the group coordinator) and
+   * we validate coordinate end transaction markers from the transaction coordinator.
+   */
+  case object Coordinator extends AppendOrigin
+
+  /**
+   * The log append came from the client, which implies full validation.
+   */
+  case object Client extends AppendOrigin
+}
+
+private[log] object LogValidator extends Logging {
 
   /**
    * Update the offsets for this message set and do further validation on messages including:
@@ -50,37 +75,37 @@ private[kafka] object LogValidator extends Logging {
    * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset
    * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed.
    */
-  private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords,
-                                                      topicPartition: TopicPartition,
-                                                      offsetCounter: LongRef,
-                                                      time: Time,
-                                                      now: Long,
-                                                      sourceCodec: CompressionCodec,
-                                                      targetCodec: CompressionCodec,
-                                                      compactedTopic: Boolean,
-                                                      magic: Byte,
-                                                      timestampType: TimestampType,
-                                                      timestampDiffMaxMs: Long,
-                                                      partitionLeaderEpoch: Int,
-                                                      isFromClient: Boolean,
-                                                      interBrokerProtocolVersion: ApiVersion,
-                                                      brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
+  private[log] def validateMessagesAndAssignOffsets(records: MemoryRecords,
+                                                    topicPartition: TopicPartition,
+                                                    offsetCounter: LongRef,
+                                                    time: Time,
+                                                    now: Long,
+                                                    sourceCodec: CompressionCodec,
+                                                    targetCodec: CompressionCodec,
+                                                    compactedTopic: Boolean,
+                                                    magic: Byte,
+                                                    timestampType: TimestampType,
+                                                    timestampDiffMaxMs: Long,
+                                                    partitionLeaderEpoch: Int,
+                                                    origin: AppendOrigin,
+                                                    interBrokerProtocolVersion: ApiVersion,
+                                                    brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
       if (!records.hasMatchingMagic(magic))
         convertAndAssignOffsetsNonCompressed(records, topicPartition, offsetCounter, compactedTopic, time, now, timestampType,
-          timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient, brokerTopicStats)
+          timestampDiffMaxMs, magic, partitionLeaderEpoch, origin, brokerTopicStats)
       else
         // Do in-place validation, offset assignment and maybe set timestamp
         assignOffsetsNonCompressed(records, topicPartition, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
-          partitionLeaderEpoch, isFromClient, magic, brokerTopicStats)
+          partitionLeaderEpoch, origin, magic, brokerTopicStats)
     } else {
       validateMessagesAndAssignOffsetsCompressed(records, topicPartition, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic,
-        magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, interBrokerProtocolVersion, brokerTopicStats)
+        magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, origin, interBrokerProtocolVersion, brokerTopicStats)
     }
   }
 
-  private[kafka] def getFirstBatchAndMaybeValidateNoMoreBatches(records: MemoryRecords, sourceCodec: CompressionCodec): RecordBatch = {
+  private def getFirstBatchAndMaybeValidateNoMoreBatches(records: MemoryRecords, sourceCodec: CompressionCodec): RecordBatch = {
     val batchIterator = records.batches.iterator
 
     if (!batchIterator.hasNext) {
@@ -99,14 +124,19 @@ private[kafka] object LogValidator extends Logging {
     batch
   }
 
-  private def validateBatch(topicPartition: TopicPartition, firstBatch: RecordBatch, batch: RecordBatch, isFromClient: Boolean, toMagic: Byte, brokerTopicStats: BrokerTopicStats): Unit = {
+  private def validateBatch(topicPartition: TopicPartition,
+                            firstBatch: RecordBatch,
+                            batch: RecordBatch,
+                            origin: AppendOrigin,
+                            toMagic: Byte,
+                            brokerTopicStats: BrokerTopicStats): Unit = {
     // batch magic byte should have the same magic as the first batch
     if (firstBatch.magic() != batch.magic()) {
       brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
       throw new InvalidRecordException(s"Batch magic ${batch.magic()} is not the same as the first batch'es magic byte ${firstBatch.magic()} in topic partition $topicPartition.")
     }
 
-    if (isFromClient) {
+    if (origin == AppendOrigin.Client) {
       if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
         val countFromOffsets = batch.lastOffset - batch.baseOffset + 1
         if (countFromOffsets <= 0) {
@@ -128,15 +158,15 @@ private[kafka] object LogValidator extends Logging {
         }
       }
 
-      if (batch.hasProducerId && batch.baseSequence < 0) {
+      if (batch.isControlBatch) {
         brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
-        throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence} in record batch " +
-          s"with producerId ${batch.producerId} in topic partition $topicPartition.")
+        throw new InvalidRecordException(s"Clients are not allowed to write control records in topic partition $topicPartition.")
       }
 
-      if (batch.isControlBatch) {
+      if (batch.hasProducerId && batch.baseSequence < 0) {
         brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
-        throw new InvalidRecordException(s"Clients are not allowed to write control records in topic partition $topicPartition.")
+        throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence} in record batch " +
+          s"with producerId ${batch.producerId} in topic partition $topicPartition.")
       }
     }
 
@@ -185,7 +215,7 @@ private[kafka] object LogValidator extends Logging {
                                                    timestampDiffMaxMs: Long,
                                                    toMagicValue: Byte,
                                                    partitionLeaderEpoch: Int,
-                                                   isFromClient: Boolean,
+                                                   origin: AppendOrigin,
                                                    brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
     val startNanos = time.nanoseconds
     val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
@@ -203,7 +233,7 @@ private[kafka] object LogValidator extends Logging {
     val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec)
 
     for (batch <- records.batches.asScala) {
-      validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagicValue, brokerTopicStats)
+      validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, brokerTopicStats)
 
       for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
         validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
@@ -232,7 +262,7 @@ private[kafka] object LogValidator extends Logging {
                                          timestampType: TimestampType,
                                          timestampDiffMaxMs: Long,
                                          partitionLeaderEpoch: Int,
-                                         isFromClient: Boolean,
+                                         origin: AppendOrigin,
                                          magic: Byte,
                                          brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
@@ -242,7 +272,7 @@ private[kafka] object LogValidator extends Logging {
     val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec)
 
     for (batch <- records.batches.asScala) {
-      validateBatch(topicPartition, firstBatch, batch, isFromClient, magic, brokerTopicStats)
+      validateBatch(topicPartition, firstBatch, batch, origin, magic, brokerTopicStats)
 
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
       var offsetOfMaxBatchTimestamp = -1L
@@ -309,7 +339,7 @@ private[kafka] object LogValidator extends Logging {
                                                  timestampType: TimestampType,
                                                  timestampDiffMaxMs: Long,
                                                  partitionLeaderEpoch: Int,
-                                                 isFromClient: Boolean,
+                                                 origin: AppendOrigin,
                                                  interBrokerProtocolVersion: ApiVersion,
                                                  brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
 
@@ -343,7 +373,7 @@ private[kafka] object LogValidator extends Logging {
 
     val batches = records.batches.asScala
     for (batch <- batches) {
-      validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagic, brokerTopicStats)
+      validateBatch(topicPartition, firstBatch, batch, origin, toMagic, brokerTopicStats)
       uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
 
       // if we are on version 2 and beyond, and we know we are going for in place assignment,
@@ -391,8 +421,8 @@ private[kafka] object LogValidator extends Logging {
         val first = records.batches.asScala.head
         (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
       }
-      buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
-        validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
+      buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec),
+        now, validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch,
         uncompressedSizeInBytes)
     } else {
       // we can update the batch only and write the compressed payload as is;
@@ -432,7 +462,6 @@ private[kafka] object LogValidator extends Logging {
                                            baseSequence: Int,
                                            isTransactional: Boolean,
                                            partitionLeaderEpoch: Int,
-                                           isFromClient: Boolean,
                                            uncompressedSizeInBytes: Int): ValidationAndOffsetAssignResult = {
     val startNanos = time.nanoseconds
     val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 04dafd8..f195906 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -26,7 +26,6 @@ import kafka.server.LogOffsetMetadata
 import kafka.utils.{Logging, nonthreadsafe, threadsafe}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors._
-import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
 import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
@@ -44,28 +43,6 @@ class CorruptSnapshotException(msg: String) extends KafkaException(msg)
 case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short)
 
 
-// ValidationType and its subtypes define the extent of the validation to perform on a given ProducerAppendInfo instance
-private[log] sealed trait ValidationType
-private[log] object ValidationType {
-
-  /**
-    * This indicates no validation should be performed on the incoming append. This is the case for all appends on
-    * a replica, as well as appends when the producer state is being built from the log.
-    */
-  case object None extends ValidationType
-
-  /**
-    * We only validate the epoch (and not the sequence numbers) for offset commit requests coming from the transactional
-    * producer. These appends will not have sequence numbers, so we can't validate them.
-    */
-  case object EpochOnly extends ValidationType
-
-  /**
-    * Perform the full validation. This should be used fo regular produce requests coming to the leader.
-    */
-  case object Full extends ValidationType
-}
-
 private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffsetMetadata, var lastOffset: Option[Long] = None) {
   def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset))
 
@@ -79,12 +56,18 @@ private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffset
 
 private[log] object ProducerStateEntry {
   private[log] val NumBatchesToRetain = 5
-  def empty(producerId: Long) = new ProducerStateEntry(producerId, mutable.Queue[BatchMetadata](), RecordBatch.NO_PRODUCER_EPOCH, -1, None)
+
+  def empty(producerId: Long) = new ProducerStateEntry(producerId,
+    batchMetadata = mutable.Queue[BatchMetadata](),
+    producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+    coordinatorEpoch = -1,
+    lastTimestamp = RecordBatch.NO_TIMESTAMP,
+    currentTxnFirstOffset = None)
 }
 
 private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) {
-  def firstSeq =  DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta)
-  def firstOffset = lastOffset - offsetDelta
+  def firstSeq: Int =  DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta)
+  def firstOffset: Long = lastOffset - offsetDelta
 
   override def toString: String = {
     "BatchMetadata(" +
@@ -103,28 +86,28 @@ private[log] class ProducerStateEntry(val producerId: Long,
                                       val batchMetadata: mutable.Queue[BatchMetadata],
                                       var producerEpoch: Short,
                                       var coordinatorEpoch: Int,
+                                      var lastTimestamp: Long,
                                       var currentTxnFirstOffset: Option[Long]) {
 
   def firstSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.front.firstSeq
 
-  def firstOffset: Long = if (isEmpty) -1L else batchMetadata.front.firstOffset
+  def firstDataOffset: Long = if (isEmpty) -1L else batchMetadata.front.firstOffset
 
   def lastSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.last.lastSeq
 
   def lastDataOffset: Long = if (isEmpty) -1L else batchMetadata.last.lastOffset
 
-  def lastTimestamp: Long = if (isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp
-
   def lastOffsetDelta : Int = if (isEmpty) 0 else batchMetadata.last.offsetDelta
 
   def isEmpty: Boolean = batchMetadata.isEmpty
 
   def addBatch(producerEpoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long): Unit = {
-    maybeUpdateEpoch(producerEpoch)
+    maybeUpdateProducerEpoch(producerEpoch)
     addBatchMetadata(BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp))
+    this.lastTimestamp = timestamp
   }
 
-  def maybeUpdateEpoch(producerEpoch: Short): Boolean = {
+  def maybeUpdateProducerEpoch(producerEpoch: Short): Boolean = {
     if (this.producerEpoch != producerEpoch) {
       batchMetadata.clear()
       this.producerEpoch = producerEpoch
@@ -141,11 +124,12 @@ private[log] class ProducerStateEntry(val producerId: Long,
   }
 
   def update(nextEntry: ProducerStateEntry): Unit = {
-    maybeUpdateEpoch(nextEntry.producerEpoch)
+    maybeUpdateProducerEpoch(nextEntry.producerEpoch)
     while (nextEntry.batchMetadata.nonEmpty)
       addBatchMetadata(nextEntry.batchMetadata.dequeue())
     this.coordinatorEpoch = nextEntry.coordinatorEpoch
     this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset
+    this.lastTimestamp = nextEntry.lastTimestamp
   }
 
   def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
@@ -169,6 +153,7 @@ private[log] class ProducerStateEntry(val producerId: Long,
       s"producerEpoch=$producerEpoch, " +
       s"currentTxnFirstOffset=$currentTxnFirstOffset, " +
       s"coordinatorEpoch=$coordinatorEpoch, " +
+      s"lastTimestamp=$lastTimestamp, " +
       s"batchMetadata=$batchMetadata"
   }
 }
@@ -184,39 +169,41 @@ private[log] class ProducerStateEntry(val producerId: Long,
  *                      the most recent appends made by the producer. Validation of the first incoming append will
  *                      be made against the latest append in the current entry. New appends will replace older appends
  *                      in the current entry so that the space overhead is constant.
- * @param validationType Indicates the extent of validation to perform on the appends on this instance. Offset commits
- *                       coming from the producer should have ValidationType.EpochOnly. Appends which aren't from a client
- *                       should have ValidationType.None. Appends coming from a client for produce requests should have
- *                       ValidationType.Full.
+ * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset
+ *               commits, which originate from the group coordinator, do not have sequence numbers and therefore
+ *               only producer epoch validation is done. Appends which come through replciation are not validated
+ *               (we assume the validation has already been done) and appends from clients require full validation.
  */
 private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
                                       val producerId: Long,
                                       val currentEntry: ProducerStateEntry,
-                                      val validationType: ValidationType) {
+                                      val origin: AppendOrigin) extends Logging {
+
   private val transactions = ListBuffer.empty[TxnMetadata]
   private val updatedEntry = ProducerStateEntry.empty(producerId)
 
   updatedEntry.producerEpoch = currentEntry.producerEpoch
   updatedEntry.coordinatorEpoch = currentEntry.coordinatorEpoch
+  updatedEntry.lastTimestamp = currentEntry.lastTimestamp
   updatedEntry.currentTxnFirstOffset = currentEntry.currentTxnFirstOffset
 
-  private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = {
-    validationType match {
-      case ValidationType.None =>
-
-      case ValidationType.EpochOnly =>
-        checkProducerEpoch(producerEpoch, offset)
-
-      case ValidationType.Full =>
-        checkProducerEpoch(producerEpoch, offset)
-        checkSequence(producerEpoch, firstSeq, offset)
+  private def maybeValidateDataBatch(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = {
+    checkProducerEpoch(producerEpoch, offset)
+    if (origin == AppendOrigin.Client) {
+      checkSequence(producerEpoch, firstSeq, offset)
     }
   }
 
   private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
     if (producerEpoch < updatedEntry.producerEpoch) {
-      throw new ProducerFencedException(s"Producer's epoch at offset $offset is no longer valid in " +
-        s"partition $topicPartition: $producerEpoch (request epoch), ${updatedEntry.producerEpoch} (current epoch)")
+      val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " +
+        s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
+
+      if (origin == AppendOrigin.Replication) {
+        warn(message)
+      } else {
+        throw new ProducerFencedException(message)
+      }
     }
   }
 
@@ -261,8 +248,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
     nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue)
   }
 
-  def append(batch: RecordBatch,
-             firstOffsetMetadataOpt: Option[LogOffsetMetadata]): Option[CompletedTxn] = {
+  def append(batch: RecordBatch, firstOffsetMetadataOpt: Option[LogOffsetMetadata]): Option[CompletedTxn] = {
     if (batch.isControlBatch) {
       val recordIterator = batch.iterator
       if (recordIterator.hasNext) {
@@ -276,21 +262,21 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
       }
     } else {
       val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(LogOffsetMetadata(batch.baseOffset))
-      append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp,
+      appendDataBatch(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp,
         firstOffsetMetadata, batch.lastOffset, batch.isTransactional)
       None
     }
   }
 
-  def append(epoch: Short,
-             firstSeq: Int,
-             lastSeq: Int,
-             lastTimestamp: Long,
-             firstOffsetMetadata: LogOffsetMetadata,
-             lastOffset: Long,
-             isTransactional: Boolean): Unit = {
+  def appendDataBatch(epoch: Short,
+                      firstSeq: Int,
+                      lastSeq: Int,
+                      lastTimestamp: Long,
+                      firstOffsetMetadata: LogOffsetMetadata,
+                      lastOffset: Long,
+                      isTransactional: Boolean): Unit = {
     val firstOffset = firstOffsetMetadata.messageOffset
-    maybeValidateAppend(epoch, firstSeq, firstOffset)
+    maybeValidateDataBatch(epoch, firstSeq, firstOffset)
     updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp)
 
     updatedEntry.currentTxnFirstOffset match {
@@ -308,18 +294,26 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
     }
   }
 
+  private def checkCoordinatorEpoch(endTxnMarker: EndTransactionMarker, offset: Long): Unit = {
+    if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch) {
+      if (origin == AppendOrigin.Replication) {
+        info(s"Detected invalid coordinator epoch for producerId $producerId at " +
+          s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " +
+          s"is older than previously known coordinator epoch ${updatedEntry.coordinatorEpoch}")
+      } else {
+        throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId $producerId at " +
+          s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " +
+          s"(zombie), ${updatedEntry.coordinatorEpoch} (current)")
+      }
+    }
+  }
+
   def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
                          producerEpoch: Short,
                          offset: Long,
                          timestamp: Long): CompletedTxn = {
     checkProducerEpoch(producerEpoch, offset)
-
-    if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch)
-      throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId $producerId at " +
-        s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " +
-        s"(zombie), ${updatedEntry.coordinatorEpoch} (current)")
-
-    updatedEntry.maybeUpdateEpoch(producerEpoch)
+    checkCoordinatorEpoch(endTxnMarker, offset)
 
     val firstOffset = updatedEntry.currentTxnFirstOffset match {
       case Some(txnFirstOffset) => txnFirstOffset
@@ -328,8 +322,11 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
         offset
     }
 
+    updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
     updatedEntry.currentTxnFirstOffset = None
     updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch
+    updatedEntry.lastTimestamp = timestamp
+
     CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
   }
 
@@ -345,6 +342,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
       s"lastSequence=${updatedEntry.lastSeq}, " +
       s"currentTxnFirstOffset=${updatedEntry.currentTxnFirstOffset}, " +
       s"coordinatorEpoch=${updatedEntry.coordinatorEpoch}, " +
+      s"lastTimestamp=${updatedEntry.lastTimestamp}, " +
       s"startedTransactions=$transactions)"
   }
 }
@@ -398,7 +396,7 @@ object ProducerStateManager {
 
       struct.getArray(ProducerEntriesField).map { producerEntryObj =>
         val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
-        val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
+        val producerId = producerEntryStruct.getLong(ProducerIdField)
         val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
         val seq = producerEntryStruct.getInt(LastSequenceField)
         val offset = producerEntryStruct.getLong(LastOffsetField)
@@ -406,8 +404,12 @@ object ProducerStateManager {
         val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
         val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
         val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
-        val newEntry = new ProducerStateEntry(producerId, mutable.Queue[BatchMetadata](BatchMetadata(seq, offset, offsetDelta, timestamp)), producerEpoch,
-          coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
+        val lastAppendedDataBatches = mutable.Queue.empty[BatchMetadata]
+        if (offset >= 0)
+          lastAppendedDataBatches += BatchMetadata(seq, offset, offsetDelta, timestamp)
+
+        val newEntry = new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch,
+          coordinatorEpoch, timestamp, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
         newEntry
       }
     } catch {
@@ -621,17 +623,9 @@ class ProducerStateManager(val topicPartition: TopicPartition,
     }
   }
 
-  def prepareUpdate(producerId: Long, isFromClient: Boolean): ProducerAppendInfo = {
-    val validationToPerform =
-      if (!isFromClient)
-        ValidationType.None
-      else if (topicPartition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
-        ValidationType.EpochOnly
-      else
-        ValidationType.Full
-
+  def prepareUpdate(producerId: Long, origin: AppendOrigin): ProducerAppendInfo = {
     val currentEntry = lastEntry(producerId).getOrElse(ProducerStateEntry.empty(producerId))
-    new ProducerAppendInfo(topicPartition, producerId, currentEntry, validationToPerform)
+    new ProducerAppendInfo(topicPartition, producerId, currentEntry, origin)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index abfd03a..e3d53a1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -33,6 +33,7 @@ import kafka.common.OffsetAndMetadata
 import kafka.controller.{KafkaController, ReplicaAssignment}
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult, LeaveGroupResult, SyncGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
+import kafka.log.AppendOrigin
 import kafka.message.ZStdCompressionCodec
 import kafka.network.RequestChannel
 import kafka.security.authorizer.AuthorizerUtils
@@ -571,7 +572,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         timeout = produceRequest.timeout.toLong,
         requiredAcks = produceRequest.acks,
         internalTopicsAllowed = internalTopicsAllowed,
-        isFromClient = true,
+        origin = AppendOrigin.Client,
         entriesPerPartition = authorizedRequestInfo,
         responseCallback = sendResponseCallback,
         recordConversionStatsCallback = processingStatsCallback)
@@ -1996,7 +1997,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           timeout = config.requestTimeoutMs.toLong,
           requiredAcks = -1,
           internalTopicsAllowed = true,
-          isFromClient = false,
+          origin = AppendOrigin.Coordinator,
           entriesPerPartition = controlRecords,
           responseCallback = maybeSendResponseCallback(producerId, marker.transactionResult))
       }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 42302bb..180b0e7 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -506,7 +506,7 @@ class ReplicaManager(val config: KafkaConfig,
   def appendRecords(timeout: Long,
                     requiredAcks: Short,
                     internalTopicsAllowed: Boolean,
-                    isFromClient: Boolean,
+                    origin: AppendOrigin,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                     delayedProduceLock: Option[Lock] = None,
@@ -514,7 +514,7 @@ class ReplicaManager(val config: KafkaConfig,
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
-        isFromClient = isFromClient, entriesPerPartition, requiredAcks)
+        origin, entriesPerPartition, requiredAcks)
       debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
 
       val produceStatus = localProduceResults.map { case (topicPartition, result) =>
@@ -773,7 +773,7 @@ class ReplicaManager(val config: KafkaConfig,
    * Append the messages to the local replica logs
    */
   private def appendToLocalLog(internalTopicsAllowed: Boolean,
-                               isFromClient: Boolean,
+                               origin: AppendOrigin,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
 
@@ -802,7 +802,7 @@ class ReplicaManager(val config: KafkaConfig,
       } else {
         try {
           val partition = getPartitionOrException(topicPartition, expectLeader = true)
-          val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
+          val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
           val numAppendedMessages = info.numMessages
 
           // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 133ce1d..7f50ce1 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -499,8 +499,8 @@ class PartitionTest {
 
     // after makeLeader(() call, partition should know about all the replicas
     // append records with initial leader epoch
-    partition.appendRecordsToLeader(batch1, isFromClient = true)
-    partition.appendRecordsToLeader(batch2, isFromClient = true)
+    partition.appendRecordsToLeader(batch1, origin = AppendOrigin.Client, requiredAcks = 0)
+    partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0)
     assertEquals("Expected leader's HW not move", partition.localLogOrException.logStartOffset,
       partition.localLogOrException.highWatermark)
 
@@ -756,7 +756,7 @@ class PartitionTest {
       new SimpleRecord("k2".getBytes, "v2".getBytes),
       new SimpleRecord("k3".getBytes, "v3".getBytes)),
       baseOffset = 0L)
-    partition.appendRecordsToLeader(records, isFromClient = true)
+    partition.appendRecordsToLeader(records, origin = AppendOrigin.Client, requiredAcks = 0)
 
     def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
       val res = partition.fetchOffsetForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP,
@@ -875,8 +875,9 @@ class PartitionTest {
 
     // after makeLeader(() call, partition should know about all the replicas
     // append records with initial leader epoch
-    val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, isFromClient = true).lastOffset
-    partition.appendRecordsToLeader(batch2, isFromClient = true)
+    val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, origin = AppendOrigin.Client,
+      requiredAcks = 0).lastOffset
+    partition.appendRecordsToLeader(batch2, origin = AppendOrigin.Client, requiredAcks = 0)
     assertEquals("Expected leader's HW not move", partition.localLogOrException.logStartOffset,
       partition.log.get.highWatermark)
 
@@ -919,7 +920,7 @@ class PartitionTest {
     val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset
 
     // append records with the latest leader epoch
-    partition.appendRecordsToLeader(batch3, isFromClient = true)
+    partition.appendRecordsToLeader(batch3, origin = AppendOrigin.Client, requiredAcks = 0)
 
     // fetch from follower not in ISR from log start offset should not add this follower to ISR
     updateFollowerFetchState(follower1, LogOffsetMetadata(0))
@@ -1014,7 +1015,11 @@ class PartitionTest {
       // Append records to partitions, one partition-per-thread
       val futures = partitions.map { partition =>
         executor.submit(CoreUtils.runnable {
-          (1 to 10000).foreach { _ => partition.appendRecordsToLeader(createRecords(baseOffset = 0), isFromClient = true) }
+          (1 to 10000).foreach { _ =>
+            partition.appendRecordsToLeader(createRecords(baseOffset = 0),
+              origin = AppendOrigin.Client,
+              requiredAcks = 0)
+          }
         })
       }
       futures.foreach(_.get(15, TimeUnit.SECONDS))
diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 709d5a0..c39a23f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.Lock
 
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
-import kafka.log.Log
+import kafka.log.{AppendOrigin, Log}
 import kafka.server._
 import kafka.utils._
 import kafka.utils.timer.MockTimer
@@ -173,7 +173,7 @@ object AbstractCoordinatorConcurrencyTest {
     override def appendRecords(timeout: Long,
                                requiredAcks: Short,
                                internalTopicsAllowed: Boolean,
-                               isFromClient: Boolean,
+                               origin: AppendOrigin,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                                delayedProduceLock: Option[Lock] = None,
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index cb67c70..54116a3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import kafka.cluster.Partition
+import kafka.log.AppendOrigin
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
@@ -3221,7 +3222,7 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
@@ -3336,7 +3337,7 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
@@ -3366,7 +3367,7 @@ class GroupCoordinatorTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index b0f81c8..b3a7f66 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -28,7 +28,7 @@ import javax.management.ObjectName
 import kafka.api._
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
-import kafka.log.{Log, LogAppendInfo}
+import kafka.log.{AppendOrigin, Log, LogAppendInfo}
 import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
@@ -1345,7 +1345,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1380,7 +1380,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     mockGetPartition()
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
 
@@ -1428,7 +1428,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     mockGetPartition()
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
 
@@ -1503,7 +1503,7 @@ class GroupMetadataManagerTest {
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1604,7 +1604,7 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1628,7 +1628,7 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1671,7 +1671,7 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1749,7 +1749,7 @@ class GroupMetadataManagerTest {
     // expect the offset tombstone
     EasyMock.reset(partition)
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -1877,7 +1877,7 @@ class GroupMetadataManagerTest {
 
     // expect the offset tombstone
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
-      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      origin = EasyMock.eq(AppendOrigin.Coordinator), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.expectLastCall().times(1)
 
@@ -2196,7 +2196,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
@@ -2212,7 +2212,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.capture(capturedRecords),
       EasyMock.capture(capturedCallback),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index e462c14..5ab3f82 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -19,12 +19,11 @@ package kafka.coordinator.transaction
 import java.lang.management.ManagementFactory
 import java.nio.ByteBuffer
 import java.util.concurrent.locks.ReentrantLock
-import javax.management.ObjectName
 
-import kafka.log.Log
+import javax.management.ObjectName
+import kafka.log.{AppendOrigin, Log}
 import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.{MockScheduler, Pool}
-import org.scalatest.Assertions.fail
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
@@ -34,13 +33,13 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.MockTime
+import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.{After, Before, Test}
-import org.easymock.{Capture, EasyMock, IAnswer}
+import org.scalatest.Assertions.fail
 
-import scala.collection.Map
-import scala.collection.mutable
 import scala.collection.JavaConverters._
+import scala.collection.{Map, mutable}
 
 class TransactionStateManagerTest {
 
@@ -558,7 +557,7 @@ class TransactionStateManagerTest {
         EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
           EasyMock.eq((-1).toShort),
           EasyMock.eq(true),
-          EasyMock.eq(false),
+          EasyMock.eq(AppendOrigin.Coordinator),
           EasyMock.eq(recordsByPartition),
           EasyMock.capture(capturedArgument),
           EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
@@ -670,7 +669,7 @@ class TransactionStateManagerTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
-      isFromClient = EasyMock.eq(false),
+      origin = EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index fcb82e5..bdf5cb7 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -509,7 +509,8 @@ class LogCleanerManagerTest extends Logging {
     assertEquals(0L, cleanableOffsets._2)
 
     log.appendAsLeader(MemoryRecords.withEndTransactionMarker(time.milliseconds(), producerId, producerEpoch,
-      new EndTransactionMarker(ControlRecordType.ABORT, 15)), leaderEpoch = 0, isFromClient = false)
+      new EndTransactionMarker(ControlRecordType.ABORT, 15)), leaderEpoch = 0,
+      origin = AppendOrigin.Coordinator)
     log.roll()
     log.updateHighWatermark(4L)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index bb30287..18d86b6 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -265,10 +265,10 @@ class LogCleanerTest {
     appendProducer1(Seq(1, 2))
     appendProducer2(Seq(2, 3))
     appendProducer1(Seq(3, 4))
-    log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, isFromClient = false)
-    log.appendAsLeader(commitMarker(pid2, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
+    log.appendAsLeader(commitMarker(pid2, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer1(Seq(2))
-    log.appendAsLeader(commitMarker(pid1, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(pid1, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
 
     val abortedTransactions = log.collectAbortedTransactions(log.logStartOffset, log.logEndOffset)
 
@@ -306,11 +306,11 @@ class LogCleanerTest {
     appendProducer2(Seq(5, 6))
     appendProducer3(Seq(6, 7))
     appendProducer1(Seq(7, 8))
-    log.appendAsLeader(abortMarker(pid2, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(pid2, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer3(Seq(8, 9))
-    log.appendAsLeader(commitMarker(pid3, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(pid3, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer1(Seq(9, 10))
-    log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
 
     // we have only cleaned the records in the first segment
     val dirtyOffset = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))._1
@@ -341,9 +341,9 @@ class LogCleanerTest {
 
     appendProducer(Seq(1))
     appendProducer(Seq(2, 3))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer(Seq(2))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     // cannot remove the marker in this pass because there are still valid records
@@ -352,7 +352,7 @@ class LogCleanerTest {
     assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
 
     appendProducer(Seq(1, 3))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     // the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
@@ -389,10 +389,10 @@ class LogCleanerTest {
     val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
 
     appendProducer(Seq(1))
-    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer(Seq(2))
     appendProducer(Seq(2))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
@@ -422,14 +422,16 @@ class LogCleanerTest {
 
     // [{Producer1: 2, 3}], [{Producer2: 2, 3}, {Producer2: Commit}]
     producer2(Seq(2, 3)) // offsets 2, 3
-    log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 4
+    log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0,
+      origin = AppendOrigin.Coordinator) // offset 4
     log.roll()
 
     // [{Producer1: 2, 3}], [{Producer2: 2, 3}, {Producer2: Commit}], [{2}, {3}, {Producer1: Commit}]
     //  {0, 1},              {2, 3},            {4},                   {5}, {6}, {7} ==> Offsets
     log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 5
     log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 6
-    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 7
+    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0,
+      origin = AppendOrigin.Coordinator) // offset 7
     log.roll()
 
     // first time through the records are removed
@@ -450,7 +452,8 @@ class LogCleanerTest {
     // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
     //  {1},                     {3},                     {4},                 {5}, {6}, {7},                 {8},            {9} ==> Offsets
     producer2(Seq(1)) // offset 8
-    log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 9
+    log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0,
+      origin = AppendOrigin.Coordinator) // offset 9
     log.roll()
 
     // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
@@ -477,7 +480,8 @@ class LogCleanerTest {
     val producerEpoch = 0.toShort
 
     // [{Producer1: Commit}, {2}, {3}]
-    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 7
+    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0,
+      origin = AppendOrigin.Coordinator) // offset 1
     log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 2
     log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 3
     log.roll()
@@ -511,7 +515,7 @@ class LogCleanerTest {
     appendTransaction(Seq(1))
     log.roll()
 
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     // Both the record and the marker should remain after cleaning
@@ -534,7 +538,7 @@ class LogCleanerTest {
     appendTransaction(Seq(1))
     log.roll()
 
-    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     // Both the batch and the marker should remain after cleaning. The batch is retained
@@ -564,9 +568,9 @@ class LogCleanerTest {
 
     appendProducer(Seq(1))
     appendProducer(Seq(2, 3))
-    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     appendProducer(Seq(3))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     // delete horizon set to 0 to verify marker is not removed early
@@ -593,13 +597,15 @@ class LogCleanerTest {
     logProps.put(LogConfig.SegmentBytesProp, 2048: java.lang.Integer)
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
-    val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch, isFromClient = false)
+    val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
+      origin = AppendOrigin.Replication)
     appendFirstTransaction(Seq(1))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
 
-    val appendSecondTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch, isFromClient = false)
+    val appendSecondTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
+      origin = AppendOrigin.Replication)
     appendSecondTransaction(Seq(2))
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
 
     log.appendAsLeader(record(1, 1), leaderEpoch = 0)
     log.appendAsLeader(record(2, 1), leaderEpoch = 0)
@@ -632,7 +638,7 @@ class LogCleanerTest {
     val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
 
     appendProducer(Seq(2, 3)) // batch last offset is 1
-    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     def assertAbortedTransactionIndexed(): Unit = {
@@ -872,7 +878,7 @@ class LogCleanerTest {
 
     appendProducer(Seq(1))
     appendProducer(Seq(2, 3))
-    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
     log.roll()
 
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
@@ -1625,8 +1631,8 @@ class LogCleanerTest {
                                           producerId: Long,
                                           producerEpoch: Short,
                                           leaderEpoch: Int = 0,
-                                          isFromClient: Boolean = true): Seq[Int] => LogAppendInfo = {
-    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, isFromClient = isFromClient)
+                                          origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
+    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, origin = origin)
   }
 
   private def appendIdempotentAsLeader(log: Log,
@@ -1634,7 +1640,7 @@ class LogCleanerTest {
                                        producerEpoch: Short,
                                        isTransactional: Boolean = false,
                                        leaderEpoch: Int = 0,
-                                       isFromClient: Boolean = true): Seq[Int] => LogAppendInfo = {
+                                       origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
     var sequence = 0
     keys: Seq[Int] => {
       val simpleRecords = keys.map { key =>
@@ -1646,7 +1652,7 @@ class LogCleanerTest {
       else
         MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords.toArray: _*)
       sequence += simpleRecords.size
-      log.appendAsLeader(records, leaderEpoch, isFromClient)
+      log.appendAsLeader(records, leaderEpoch, origin)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 5e0a1a8..6cc6354 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -336,7 +336,8 @@ class LogSegmentTest {
     // recover again, but this time assuming the transaction from pid2 began on a previous segment
     stateManager = new ProducerStateManager(topicPartition, logDir)
     stateManager.loadProducerEntry(new ProducerStateEntry(pid2,
-      mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, 0, Some(75L)))
+      mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch,
+      0, RecordBatch.NO_TIMESTAMP, Some(75L)))
     segment.recover(stateManager)
     assertEquals(108L, stateManager.mapEndOffset)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4dba8ba..8eaf47c 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -299,7 +299,6 @@ class LogTest {
     assertFalse(Log.FutureDirPattern.matcher(name1).matches())
     val name2 = Log.logDeleteDirName(
       new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5))
-    System.out.println("name2 = " + name2)
     assertEquals(255, name2.length)
     assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches())
     assertTrue(Log.DeleteDirPattern.matcher(name2).matches())
@@ -500,6 +499,39 @@ class LogTest {
     log.close()
   }
 
+
+  @Test
+  def testRecoverAfterNonMonotonicCoordinatorEpochWrite(): Unit = {
+    // Due to KAFKA-9144, we may encounter a coordinator epoch which goes backwards.
+    // This test case verifies that recovery logic relaxes validation in this case and
+    // just takes the latest write.
+
+    val producerId = 1L
+    val coordinatorEpoch = 5
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    var log = createLog(logDir, logConfig)
+    val epoch = 0.toShort
+
+    val firstAppendTimestamp = mockTime.milliseconds()
+    appendEndTxnMarkerAsLeader(log, producerId, epoch, ControlRecordType.ABORT,
+      timestamp = firstAppendTimestamp, coordinatorEpoch = coordinatorEpoch)
+    assertEquals(firstAppendTimestamp, log.producerStateManager.lastEntry(producerId).get.lastTimestamp)
+
+    mockTime.sleep(log.maxProducerIdExpirationMs)
+    assertEquals(None, log.producerStateManager.lastEntry(producerId))
+
+    val secondAppendTimestamp = mockTime.milliseconds()
+    appendEndTxnMarkerAsLeader(log, producerId, epoch, ControlRecordType.ABORT,
+      timestamp = secondAppendTimestamp, coordinatorEpoch = coordinatorEpoch - 1)
+
+    log.close()
+
+    // Force recovery by setting the recoveryPoint to the log start
+    log = createLog(logDir, logConfig, recoveryPoint = 0L)
+    assertEquals(secondAppendTimestamp, log.producerStateManager.lastEntry(producerId).get.lastTimestamp)
+    log.close()
+  }
+
   @Test
   def testProducerSnapshotsRecoveryAfterUncleanShutdownV1(): Unit = {
     testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.minSupportedFor(RecordVersion.V1).version)
@@ -959,10 +991,10 @@ class LogTest {
 
     // create a batch with a couple gaps to simulate compaction
     val records = TestUtils.records(producerId = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List(
-      new SimpleRecord(System.currentTimeMillis(), "a".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "c".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "d".getBytes)))
+      new SimpleRecord(mockTime.milliseconds(), "a".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "b".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "c".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "d".getBytes)))
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
 
     val filtered = ByteBuffer.allocate(2048)
@@ -977,8 +1009,8 @@ class LogTest {
 
     // append some more data and then truncate to force rebuilding of the PID map
     val moreRecords = TestUtils.records(baseOffset = baseOffset + 4, records = List(
-      new SimpleRecord(System.currentTimeMillis(), "e".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "f".getBytes)))
+      new SimpleRecord(mockTime.milliseconds(), "e".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "f".getBytes)))
     moreRecords.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
     log.appendAsFollower(moreRecords)
 
@@ -1002,8 +1034,8 @@ class LogTest {
 
     // create an empty batch
     val records = TestUtils.records(producerId = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List(
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "a".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes)))
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "a".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "b".getBytes)))
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
 
     val filtered = ByteBuffer.allocate(2048)
@@ -1018,8 +1050,8 @@ class LogTest {
 
     // append some more data and then truncate to force rebuilding of the PID map
     val moreRecords = TestUtils.records(baseOffset = baseOffset + 2, records = List(
-      new SimpleRecord(System.currentTimeMillis(), "e".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "f".getBytes)))
+      new SimpleRecord(mockTime.milliseconds(), "e".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "f".getBytes)))
     moreRecords.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
     log.appendAsFollower(moreRecords)
 
@@ -1043,10 +1075,10 @@ class LogTest {
 
     // create a batch with a couple gaps to simulate compaction
     val records = TestUtils.records(producerId = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List(
-      new SimpleRecord(System.currentTimeMillis(), "a".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "c".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "d".getBytes)))
+      new SimpleRecord(mockTime.milliseconds(), "a".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "b".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "c".getBytes),
+      new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "d".getBytes)))
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
 
     val filtered = ByteBuffer.allocate(2048)
@@ -1293,7 +1325,7 @@ class LogTest {
 
     val lastEntry = log.producerStateManager.lastEntry(producerId)
     assertTrue(lastEntry.isDefined)
-    assertEquals(0L, lastEntry.get.firstOffset)
+    assertEquals(0L, lastEntry.get.firstDataOffset)
     assertEquals(0L, lastEntry.get.lastDataOffset)
   }
 
@@ -1312,9 +1344,8 @@ class LogTest {
       new SimpleRecord("bar".getBytes),
       new SimpleRecord("baz".getBytes))
     log.appendAsLeader(records, leaderEpoch = 0)
-    val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid, epoch),
-      isFromClient = false, leaderEpoch = 0)
-    log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
+    val abortAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT)
+    log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
 
     // now there should be no first unstable offset
     assertEquals(None, log.firstUnstableOffset)
@@ -1322,7 +1353,7 @@ class LogTest {
     log.close()
 
     val reopenedLog = createLog(logDir, logConfig)
-    reopenedLog.updateHighWatermark(commitAppendInfo.lastOffset + 1)
+    reopenedLog.updateHighWatermark(abortAppendInfo.lastOffset + 1)
     assertEquals(None, reopenedLog.firstUnstableOffset)
   }
 
@@ -1331,9 +1362,10 @@ class LogTest {
                             epoch: Short,
                             offset: Long = 0L,
                             coordinatorEpoch: Int = 0,
-                            partitionLeaderEpoch: Int = 0): MemoryRecords = {
+                            partitionLeaderEpoch: Int = 0,
+                            timestamp: Long = mockTime.milliseconds()): MemoryRecords = {
     val marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch)
-    MemoryRecords.withEndTransactionMarker(offset, mockTime.milliseconds(), partitionLeaderEpoch, producerId, epoch, marker)
+    MemoryRecords.withEndTransactionMarker(offset, timestamp, partitionLeaderEpoch, producerId, epoch, marker)
   }
 
   @Test
@@ -3542,7 +3574,7 @@ class LogTest {
 
     val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
     val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, offset,
-      System.currentTimeMillis, leaderEpoch)
+      mockTime.milliseconds, leaderEpoch)
     records.foreach(builder.append)
     builder.build()
   }
@@ -3588,8 +3620,7 @@ class LogTest {
     assertEquals(firstAppendInfo.firstOffset, log.firstUnstableOffset)
 
     // now transaction is committed
-    val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid, epoch),
-      isFromClient = false, leaderEpoch = 0)
+    val commitAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT)
 
     // first unstable offset is not updated until the high watermark is advanced
     assertEquals(firstAppendInfo.firstOffset, log.firstUnstableOffset)
@@ -3942,6 +3973,19 @@ class LogTest {
   }
 
   @Test
+  def testEndTxnWithFencedProducerEpoch(): Unit = {
+    val producerId = 1L
+    val epoch = 5.toShort
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
+    appendEndTxnMarkerAsLeader(log, producerId, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
+
+    assertThrows[ProducerFencedException] {
+      appendEndTxnMarkerAsLeader(log, producerId, (epoch - 1).toShort, ControlRecordType.ABORT, coordinatorEpoch = 1)
+    }
+  }
+
+  @Test
   def testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
@@ -4093,16 +4137,14 @@ class LogTest {
     assertEquals(firstAppendInfo.firstOffset, log.firstUnstableOffset)
 
     // now first producer's transaction is aborted
-    val abortAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid1, epoch),
-      isFromClient = false, leaderEpoch = 0)
+    val abortAppendInfo = appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT)
     log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
 
     // LSO should now point to one less than the first offset of the second transaction
     assertEquals(secondAppendInfo.firstOffset, log.firstUnstableOffset)
 
     // commit the second transaction
-    val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid2, epoch),
-      isFromClient = false, leaderEpoch = 0)
+    val commitAppendInfo = appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.COMMIT)
     log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
 
     // now there should be no first unstable offset
@@ -4136,9 +4178,8 @@ class LogTest {
     assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset)
 
     // now abort the transaction
-    val appendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid, epoch),
-      isFromClient = false, leaderEpoch = 0)
-    log.updateHighWatermark(appendInfo.lastOffset + 1)
+    val abortAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT)
+    log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
     assertEquals(None, log.firstUnstableOffset)
 
     // now check that a fetch includes the aborted transaction
@@ -4168,7 +4209,7 @@ class LogTest {
     var sequence = 0
     numRecords: Int => {
       val simpleRecords = (sequence until sequence + numRecords).map { seq =>
-        new SimpleRecord(s"$seq".getBytes)
+        new SimpleRecord(mockTime.milliseconds(), s"$seq".getBytes)
       }
       val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
         producerEpoch, sequence, simpleRecords: _*)
@@ -4182,9 +4223,11 @@ class LogTest {
                                          producerEpoch: Short,
                                          controlType: ControlRecordType,
                                          coordinatorEpoch: Int = 0,
-                                         leaderEpoch: Int = 0): Unit = {
-    val records = endTxnRecords(controlType, producerId, producerEpoch, coordinatorEpoch = coordinatorEpoch)
-    log.appendAsLeader(records, isFromClient = false, leaderEpoch = leaderEpoch)
+                                         leaderEpoch: Int = 0,
+                                         timestamp: Long = mockTime.milliseconds()): LogAppendInfo = {
+    val records = endTxnRecords(controlType, producerId, producerEpoch,
+      coordinatorEpoch = coordinatorEpoch, timestamp = timestamp)
+    log.appendAsLeader(records, origin = AppendOrigin.Coordinator, leaderEpoch = leaderEpoch)
   }
 
   private def appendNonTransactionalAsLeader(log: Log, numRecords: Int): Unit = {
@@ -4202,7 +4245,7 @@ class LogTest {
     var sequence = 0
     (offset: Long, numRecords: Int) => {
       val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME,
-        offset, System.currentTimeMillis(), producerId, producerEpoch, sequence, true, leaderEpoch)
+        offset, mockTime.milliseconds(), producerId, producerEpoch, sequence, true, leaderEpoch)
       for (seq <- sequence until sequence + numRecords) {
         val record = new SimpleRecord(s"$seq".getBytes)
         builder.append(record)
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 923ae91..0515e79 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -101,7 +101,7 @@ class LogValidatorTest {
       TimestampType.CREATE_TIME,
       1000L,
       RecordBatch.NO_PRODUCER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       KAFKA_2_3_IV1,
       brokerTopicStats
     )
@@ -128,7 +128,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     val validatedRecords = validatedResults.validatedRecords
@@ -168,7 +168,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     val validatedRecords = validatedResults.validatedRecords
@@ -212,7 +212,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     val validatedRecords = validatedResults.validatedRecords
@@ -272,7 +272,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
   }
@@ -316,7 +316,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = partitionLeaderEpoch,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     val validatedRecords = validatingResults.validatedRecords
@@ -385,7 +385,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = partitionLeaderEpoch,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     val validatedRecords = validatingResults.validatedRecords
@@ -438,7 +438,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     val validatedRecords = validatedResults.validatedRecords
@@ -482,7 +482,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     val validatedRecords = validatedResults.validatedRecords
@@ -539,7 +539,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = partitionLeaderEpoch,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     val validatedRecords = validatedResults.validatedRecords
@@ -592,7 +592,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
   }
@@ -615,7 +615,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
   }
@@ -638,7 +638,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
   }
@@ -661,7 +661,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
   }
@@ -683,7 +683,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -705,7 +705,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -728,7 +728,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords
     checkOffsets(messageWithOffset, offset)
@@ -752,7 +752,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords
     checkOffsets(messageWithOffset, offset)
@@ -777,7 +777,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
@@ -802,7 +802,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
@@ -825,7 +825,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     checkOffsets(validatedResults.validatedRecords, offset)
@@ -850,7 +850,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     checkOffsets(validatedResults.validatedRecords, offset)
@@ -875,7 +875,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     checkOffsets(validatedResults.validatedRecords, offset)
@@ -900,7 +900,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     checkOffsets(validatedResults.validatedRecords, offset)
@@ -925,7 +925,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
   }
@@ -947,7 +947,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = false,
+      origin = AppendOrigin.Coordinator,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
     val batches = TestUtils.toList(result.validatedRecords.batches)
@@ -974,7 +974,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -997,7 +997,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -1019,7 +1019,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -1041,7 +1041,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -1064,7 +1064,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -1087,7 +1087,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -1112,7 +1112,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -1137,7 +1137,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -1160,7 +1160,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -1183,7 +1183,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats).validatedRecords, offset)
   }
@@ -1205,7 +1205,7 @@ class LogValidatorTest {
         timestampType = TimestampType.CREATE_TIME,
         timestampDiffMaxMs = 5000L,
         partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-        isFromClient = true,
+        origin = AppendOrigin.Client,
         interBrokerProtocolVersion = ApiVersion.latestVersion,
         brokerTopicStats = brokerTopicStats)
     }
@@ -1235,7 +1235,7 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = KAFKA_2_0_IV1,
       brokerTopicStats = brokerTopicStats)
   }
@@ -1269,7 +1269,7 @@ class LogValidatorTest {
         timestampType = TimestampType.CREATE_TIME,
         timestampDiffMaxMs = 1000L,
         partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-        isFromClient = true,
+        origin = AppendOrigin.Client,
         interBrokerProtocolVersion = ApiVersion.latestVersion,
         brokerTopicStats = brokerTopicStats)
     }
@@ -1318,7 +1318,7 @@ class LogValidatorTest {
       timestampType = TimestampType.CREATE_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       brokerTopicStats = brokerTopicStats)
   }
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 8500c94..94f3428 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -117,7 +117,7 @@ class ProducerStateManagerTest {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
+    append(stateManager, producerId, epoch, sequence, offset, origin = AppendOrigin.Replication)
 
     append(stateManager, producerId, epoch, 0, offset + 500)
 
@@ -135,9 +135,9 @@ class ProducerStateManagerTest {
   def testProducerSequenceWithWrapAroundBatchRecord(): Unit = {
     val epoch = 15.toShort
 
-    val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = false)
+    val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Replication)
     // Sequence number wrap around
-    appendInfo.append(epoch, Int.MaxValue - 10, 9, time.milliseconds(),
+    appendInfo.appendDataBatch(epoch, Int.MaxValue - 10, 9, time.milliseconds(),
       LogOffsetMetadata(2000L), 2020L, isTransactional = false)
     assertEquals(None, stateManager.lastEntry(producerId))
     stateManager.update(appendInfo)
@@ -146,7 +146,7 @@ class ProducerStateManagerTest {
     val lastEntry = stateManager.lastEntry(producerId).get
     assertEquals(Int.MaxValue-10, lastEntry.firstSeq)
     assertEquals(9, lastEntry.lastSeq)
-    assertEquals(2000L, lastEntry.firstOffset)
+    assertEquals(2000L, lastEntry.firstDataOffset)
     assertEquals(2020L, lastEntry.lastDataOffset)
   }
 
@@ -155,7 +155,7 @@ class ProducerStateManagerTest {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
+    append(stateManager, producerId, epoch, sequence, offset, origin = AppendOrigin.Replication)
     append(stateManager, producerId, epoch, 1, offset + 500)
   }
 
@@ -164,7 +164,7 @@ class ProducerStateManagerTest {
     val epoch = 5.toShort
     val sequence = 16
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
+    append(stateManager, producerId, epoch, sequence, offset, origin = AppendOrigin.Replication)
 
     val maybeLastEntry = stateManager.lastEntry(producerId)
     assertTrue(maybeLastEntry.isDefined)
@@ -174,7 +174,7 @@ class ProducerStateManagerTest {
     assertEquals(sequence, lastEntry.firstSeq)
     assertEquals(sequence, lastEntry.lastSeq)
     assertEquals(offset, lastEntry.lastDataOffset)
-    assertEquals(offset, lastEntry.firstOffset)
+    assertEquals(offset, lastEntry.firstDataOffset)
   }
 
   @Test
@@ -209,11 +209,11 @@ class ProducerStateManagerTest {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
+    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.Client)
 
     val firstOffsetMetadata = LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
       relativePositionInSegment = 234224)
-    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(),
+    producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(),
       firstOffsetMetadata, offset, isTransactional = true)
     stateManager.update(producerAppendInfo)
 
@@ -231,11 +231,11 @@ class ProducerStateManagerTest {
         partition,
         producerId,
         ProducerStateEntry.empty(producerId),
-        ValidationType.Full
+        AppendOrigin.Client
       )
       val firstOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
         relativePositionInSegment = 50 * relativeOffset)
-      producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(),
+      producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, time.milliseconds(),
         firstOffsetMetadata, startOffset, isTransactional = true)
       stateManager.update(producerAppendInfo)
     }
@@ -278,15 +278,15 @@ class ProducerStateManagerTest {
   def testPrepareUpdateDoesNotMutate(): Unit = {
     val producerEpoch = 0.toShort
 
-    val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
-    appendInfo.append(producerEpoch, 0, 5, time.milliseconds(),
+    val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
+    appendInfo.appendDataBatch(producerEpoch, 0, 5, time.milliseconds(),
       LogOffsetMetadata(15L), 20L, isTransactional = false)
     assertEquals(None, stateManager.lastEntry(producerId))
     stateManager.update(appendInfo)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
 
-    val nextAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
-    nextAppendInfo.append(producerEpoch, 6, 10, time.milliseconds(),
+    val nextAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
+    nextAppendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
       LogOffsetMetadata(26L), 30L, isTransactional = false)
     assertTrue(stateManager.lastEntry(producerId).isDefined)
 
@@ -309,25 +309,25 @@ class ProducerStateManagerTest {
     val offset = 9L
     append(stateManager, producerId, producerEpoch, 0, offset)
 
-    val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
-    appendInfo.append(producerEpoch, 1, 5, time.milliseconds(),
+    val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client)
+    appendInfo.appendDataBatch(producerEpoch, 1, 5, time.milliseconds(),
       LogOffsetMetadata(16L), 20L, isTransactional = true)
     var lastEntry = appendInfo.toEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
     assertEquals(5, lastEntry.lastSeq)
-    assertEquals(16L, lastEntry.firstOffset)
+    assertEquals(16L, lastEntry.firstDataOffset)
     assertEquals(20L, lastEntry.lastDataOffset)
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
-    appendInfo.append(producerEpoch, 6, 10, time.milliseconds(),
+    appendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(),
       LogOffsetMetadata(26L), 30L, isTransactional = true)
     lastEntry = appendInfo.toEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
     assertEquals(10, lastEntry.lastSeq)
-    assertEquals(16L, lastEntry.firstOffset)
+    assertEquals(16L, lastEntry.firstDataOffset)
     assertEquals(30L, lastEntry.lastDataOffset)
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
@@ -344,7 +344,7 @@ class ProducerStateManagerTest {
     // verify that appending the transaction marker doesn't affect the metadata of the cached record batches.
     assertEquals(1, lastEntry.firstSeq)
     assertEquals(10, lastEntry.lastSeq)
-    assertEquals(16L, lastEntry.firstOffset)
+    assertEquals(16L, lastEntry.firstDataOffset)
     assertEquals(30L, lastEntry.lastDataOffset)
     assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch)
     assertEquals(None, lastEntry.currentTxnFirstOffset)
@@ -417,17 +417,78 @@ class ProducerStateManagerTest {
   }
 
   @Test
-  def testRecoverFromSnapshot(): Unit = {
+  def testRecoverFromSnapshotUnfinishedTransaction(): Unit = {
     val epoch = 0.toShort
-    append(stateManager, producerId, epoch, 0, 0L)
-    append(stateManager, producerId, epoch, 1, 1L)
+    append(stateManager, producerId, epoch, 0, 0L, isTransactional = true)
+    append(stateManager, producerId, epoch, 1, 1L, isTransactional = true)
 
     stateManager.takeSnapshot()
     val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
     recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds)
 
+    // The snapshot only persists the last appended batch metadata
+    val loadedEntry = recoveredMapping.lastEntry(producerId)
+    assertEquals(1, loadedEntry.get.firstDataOffset)
+    assertEquals(1, loadedEntry.get.firstSeq)
+    assertEquals(1, loadedEntry.get.lastDataOffset)
+    assertEquals(1, loadedEntry.get.lastSeq)
+    assertEquals(Some(0), loadedEntry.get.currentTxnFirstOffset)
+
     // entry added after recovery
-    append(recoveredMapping, producerId, epoch, 2, 2L)
+    append(recoveredMapping, producerId, epoch, 2, 2L, isTransactional = true)
+  }
+
+  @Test
+  def testRecoverFromSnapshotFinishedTransaction(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, isTransactional = true)
+    append(stateManager, producerId, epoch, 1, 1L, isTransactional = true)
+    appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.ABORT, offset = 2L)
+
+    stateManager.takeSnapshot()
+    val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds)
+
+    // The snapshot only persists the last appended batch metadata
+    val loadedEntry = recoveredMapping.lastEntry(producerId)
+    assertEquals(1, loadedEntry.get.firstDataOffset)
+    assertEquals(1, loadedEntry.get.firstSeq)
+    assertEquals(1, loadedEntry.get.lastDataOffset)
+    assertEquals(1, loadedEntry.get.lastSeq)
+    assertEquals(None, loadedEntry.get.currentTxnFirstOffset)
+  }
+
+  @Test
+  def testRecoverFromSnapshotEmptyTransaction(): Unit = {
+    val epoch = 0.toShort
+    val appendTimestamp = time.milliseconds()
+    appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.ABORT,
+      offset = 0L, timestamp = appendTimestamp)
+    stateManager.takeSnapshot()
+
+    val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(logStartOffset = 0L, logEndOffset = 1L, time.milliseconds)
+
+    val lastEntry = recoveredMapping.lastEntry(producerId)
+    assertTrue(lastEntry.isDefined)
+    assertEquals(appendTimestamp, lastEntry.get.lastTimestamp)
+    assertEquals(None, lastEntry.get.currentTxnFirstOffset)
+  }
+
+  @Test
+  def testProducerStateAfterFencingAbortMarker(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, isTransactional = true)
+    appendEndTxnMarker(stateManager, producerId, (epoch + 1).toShort, ControlRecordType.ABORT, offset = 1L)
+
+    val lastEntry = stateManager.lastEntry(producerId).get
+    assertEquals(None, lastEntry.currentTxnFirstOffset)
+    assertEquals(-1, lastEntry.lastDataOffset)
+    assertEquals(-1, lastEntry.firstDataOffset)
+
+    // The producer should not be expired because we want to preserve fencing epochs
+    stateManager.removeExpiredProducers(time.milliseconds())
+    assertTrue(stateManager.lastEntry(producerId).isDefined)
   }
 
   @Test(expected = classOf[UnknownProducerIdException])
@@ -459,7 +520,7 @@ class ProducerStateManagerTest {
     // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Nonetheless
     // the append on a replica should be accepted with the local producer state updated to the appended value.
     assertFalse(recoveredMapping.activeProducers.contains(producerId))
-    append(recoveredMapping, producerId, epoch, sequence, 2L, 70001, isFromClient = false)
+    append(recoveredMapping, producerId, epoch, sequence, 2L, 70001, origin = AppendOrigin.Replication)
     assertTrue(recoveredMapping.activeProducers.contains(producerId))
     val producerStateEntry = recoveredMapping.activeProducers.get(producerId).head
     assertEquals(epoch, producerStateEntry.producerEpoch)
@@ -475,7 +536,7 @@ class ProducerStateManagerTest {
 
     // First we ensure that we raise an OutOfOrderSequenceException is raised when the append comes from a client.
     try {
-      append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = true)
+      append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, origin = AppendOrigin.Client)
       fail("Expected an OutOfOrderSequenceException to be raised.")
     } catch {
       case _ : OutOfOrderSequenceException =>
@@ -485,7 +546,7 @@ class ProducerStateManagerTest {
     }
 
     assertEquals(0L, stateManager.activeProducers(producerId).lastSeq)
-    append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = false)
+    append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, origin = AppendOrigin.Replication)
     assertEquals(outOfOrderSequence, stateManager.activeProducers(producerId).lastSeq)
   }
 
@@ -685,9 +746,10 @@ class ProducerStateManagerTest {
     val stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
 
     val epoch = 0.toShort
-    append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 99, isTransactional = true)
-    append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 100, isTransactional = true)
-
+    append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 99,
+      isTransactional = true, origin = AppendOrigin.Coordinator)
+    append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 100,
+      isTransactional = true, origin = AppendOrigin.Coordinator)
   }
 
   @Test(expected = classOf[ProducerFencedException])
@@ -778,7 +840,7 @@ class ProducerStateManagerTest {
     EasyMock.replay(batch)
 
     // Appending the empty control batch should not throw and a new transaction shouldn't be started
-    append(stateManager, producerId, producerEpoch, baseOffset, batch, isFromClient = true)
+    append(stateManager, producerId, producerEpoch, baseOffset, batch, origin = AppendOrigin.Client)
     assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
   }
 
@@ -819,7 +881,7 @@ class ProducerStateManagerTest {
                                  offset: Long,
                                  coordinatorEpoch: Int = 0,
                                  timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Coordinator)
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
     val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
     mapping.update(producerAppendInfo)
@@ -836,9 +898,9 @@ class ProducerStateManagerTest {
                      offset: Long,
                      timestamp: Long = time.milliseconds(),
                      isTransactional: Boolean = false,
-                     isFromClient : Boolean = true): Unit = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
-    producerAppendInfo.append(producerEpoch, seq, seq, timestamp,
+                     origin : AppendOrigin = AppendOrigin.Client): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, origin)
+    producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, timestamp,
       LogOffsetMetadata(offset), offset, isTransactional)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)
@@ -849,14 +911,14 @@ class ProducerStateManagerTest {
                      producerEpoch: Short,
                      offset: Long,
                      batch: RecordBatch,
-                     isFromClient : Boolean): Unit = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
+                     origin: AppendOrigin): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, origin)
     producerAppendInfo.append(batch, firstOffsetMetadataOpt = None)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)
   }
 
-  private def currentSnapshotOffsets =
+  private def currentSnapshotOffsets: Set[Long] =
     logDir.listFiles.map(Log.offsetFromFile).toSet
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index ddc6b2f..773ea29 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -28,6 +28,7 @@ import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, GroupSummary, MemberSummary}
 import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.AppendOrigin
 import kafka.network.RequestChannel
 import kafka.network.RequestChannel.SendResponse
 import kafka.server.QuotaFactory.QuotaManagers
@@ -293,7 +294,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.eq(true),
-      EasyMock.eq(false),
+      EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
       EasyMock.anyObject(),
@@ -332,7 +333,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.eq(true),
-      EasyMock.eq(false),
+      EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
       EasyMock.anyObject(),
@@ -363,7 +364,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.eq(true),
-      EasyMock.eq(false),
+      EasyMock.eq(AppendOrigin.Coordinator),
       EasyMock.anyObject(),
       EasyMock.anyObject(),
       EasyMock.anyObject(),
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 73971a7..b05ff8b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.{Optional, Properties}
 
 import kafka.api.Request
-import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
+import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.cluster.BrokerEndPoint
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.LazyOffsetCheckpoints
@@ -135,7 +135,7 @@ class ReplicaManagerTest {
         timeout = 0,
         requiredAcks = 3,
         internalTopicsAllowed = false,
-        isFromClient = true,
+        origin = AppendOrigin.Client,
         entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(CompressionType.NONE,
           new SimpleRecord("first message".getBytes))),
         responseCallback = callback)
@@ -343,7 +343,8 @@ class ReplicaManagerTest {
       // now commit the transaction
       val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0)
       val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
-      appendRecords(replicaManager, new TopicPartition(topic, 0), commitRecordBatch, isFromClient = false)
+      appendRecords(replicaManager, new TopicPartition(topic, 0), commitRecordBatch,
+        origin = AppendOrigin.Coordinator)
         .onFire { response => assertEquals(Errors.NONE, response.error) }
 
       // the LSO has advanced, but the appended commit marker has not been replicated, so
@@ -420,7 +421,8 @@ class ReplicaManagerTest {
       // now abort the transaction
       val endTxnMarker = new EndTransactionMarker(ControlRecordType.ABORT, 0)
       val abortRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
-      appendRecords(replicaManager, new TopicPartition(topic, 0), abortRecordBatch, isFromClient = false)
+      appendRecords(replicaManager, new TopicPartition(topic, 0), abortRecordBatch,
+        origin = AppendOrigin.Coordinator)
         .onFire { response => assertEquals(Errors.NONE, response.error) }
 
       // fetch as follower to advance the high watermark
@@ -1184,7 +1186,7 @@ class ReplicaManagerTest {
       timeout = 10,
       requiredAcks = -1,
       internalTopicsAllowed = false,
-      isFromClient = true,
+      origin = AppendOrigin.Client,
       entriesPerPartition = Map(topicPartition -> records),
       responseCallback = callback
     )
@@ -1397,7 +1399,7 @@ class ReplicaManagerTest {
   private def appendRecords(replicaManager: ReplicaManager,
                             partition: TopicPartition,
                             records: MemoryRecords,
-                            isFromClient: Boolean = true,
+                            origin: AppendOrigin = AppendOrigin.Client,
                             requiredAcks: Short = -1): CallbackResult[PartitionResponse] = {
     val result = new CallbackResult[PartitionResponse]()
     def appendCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = {
@@ -1410,7 +1412,7 @@ class ReplicaManagerTest {
       timeout = 1000,
       requiredAcks = requiredAcks,
       internalTopicsAllowed = false,
-      isFromClient = isFromClient,
+      origin = origin,
       entriesPerPartition = Map(partition -> records),
       responseCallback = appendCallback)
 


Mime
View raw message