kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: MINOR: Some minor improvements to TxnOffsetCommit handling
Date Tue, 16 May 2017 20:23:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d66e7af65 -> 4e3092d27


MINOR: Some minor improvements to TxnOffsetCommit handling

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3040 from hachikuji/txn-offset-commit-cleanups


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e3092d2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e3092d2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e3092d2

Branch: refs/heads/trunk
Commit: 4e3092d27648b9db18a496d16cd44f600429dcfc
Parents: d66e7af
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue May 16 13:20:38 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Tue May 16 13:22:15 2017 -0700

----------------------------------------------------------------------
 .../coordinator/group/GroupCoordinator.scala    |  84 ++--
 core/src/main/scala/kafka/log/Log.scala         |  14 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |   3 +-
 .../scala/kafka/log/ProducerStateManager.scala  |  40 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  74 ++--
 .../group/GroupCoordinatorResponseTest.scala    |  14 +-
 .../group/GroupCoordinatorTest.scala            |  36 --
 .../src/test/scala/unit/kafka/log/LogTest.scala |   7 +-
 .../kafka/log/ProducerStateManagerTest.scala    | 409 ++++++++++---------
 9 files changed, 350 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4e3092d2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 031a9c1..36e3c63 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -27,8 +27,8 @@ import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse, TransactionResult}
+import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID}
+import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.{Map, Seq, immutable}
@@ -48,6 +48,8 @@ class GroupCoordinator(val brokerId: Int,
                        val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
                        val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
                        time: Time) extends Logging {
+  import GroupCoordinator._
+
   type JoinCallback = JoinGroupResult => Unit
   type SyncCallback = (Array[Byte], Errors) => Unit
 
@@ -395,42 +397,52 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
+  def handleTxnCommitOffsets(groupId: String,
+                             producerId: Long,
+                             producerEpoch: Short,
+                             offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+                             responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
+    validateGroup(groupId) match {
+      case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
+      case None =>
+        val group = groupManager.getGroup(groupId).getOrElse(groupManager.addGroup(new GroupMetadata(groupId)))
+        doCommitOffsets(group, NoMemberId, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback)
+    }
+  }
+
   def handleCommitOffsets(groupId: String,
                           memberId: String,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
-                          producerId: Long = RecordBatch.NO_PRODUCER_ID,
-                          producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH) {
-    if (!isActive.get) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE))
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR))
-    } else if (isCoordinatorLoadInProgress(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_LOAD_IN_PROGRESS))
-    } else {
-      groupManager.getGroup(groupId) match {
-        case None =>
-          if (generationId < 0) {
-            // the group is not relying on Kafka for group management, so allow the commit
-            val group = groupManager.addGroup(new GroupMetadata(groupId))
-            doCommitOffsets(group, memberId, generationId, producerId, producerEpoch, offsetMetadata, responseCallback)
-          } else {
-            // or this is a request coming from an older generation. either way, reject the commit
-            responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
-          }
+                          responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
+    validateGroup(groupId) match {
+      case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
+      case None =>
+        groupManager.getGroup(groupId) match {
+          case None =>
+            if (generationId < 0) {
+              // the group is not relying on Kafka for group management, so allow the commit
+              val group = groupManager.addGroup(new GroupMetadata(groupId))
+              doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
+                offsetMetadata, responseCallback)
+            } else {
+              // or this is a request coming from an older generation. either way, reject the commit
+              responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
+            }
 
-        case Some(group) =>
-          doCommitOffsets(group, memberId, generationId, producerId, producerEpoch, offsetMetadata, responseCallback)
-      }
+          case Some(group) =>
+            doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
+              offsetMetadata, responseCallback)
+        }
     }
   }
 
   def handleTxnCompletion(producerId: Long,
-                          topicPartitions: Seq[TopicPartition],
+                          offsetsPartitions: Iterable[TopicPartition],
                           transactionResult: TransactionResult) {
-    val offsetPartitions = topicPartitions.filter(_.topic == Topic.GROUP_METADATA_TOPIC_NAME).map(_.partition).toSet
-    groupManager.handleTxnCompletion(producerId, offsetPartitions, transactionResult == TransactionResult.COMMIT)
+    require(offsetsPartitions.forall(_.topic == Topic.GROUP_METADATA_TOPIC_NAME))
+    val isCommit = transactionResult == TransactionResult.COMMIT
+    groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit)
   }
 
   private def doCommitOffsets(group: GroupMetadata,
@@ -444,7 +456,7 @@ class GroupCoordinator(val brokerId: Int,
     group synchronized {
       if (group.is(Dead)) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
-      } else if ((generationId < 0 && group.is(Empty)) || (producerId != RecordBatch.NO_PRODUCER_ID)) {
+      } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
         // the group is only using Kafka to store offsets
         // Also, for transactional offset commits we don't need to validate group membership and the generation.
         delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
@@ -514,6 +526,18 @@ class GroupCoordinator(val brokerId: Int,
     groupManager.cleanupGroupMetadata(Some(topicPartitions))
   }
 
+
+  private def validateGroup(groupId: String): Option[Errors] = {
+    if (!isActive.get)
+      Some(Errors.COORDINATOR_NOT_AVAILABLE)
+    else if (!isCoordinatorForGroup(groupId))
+      Some(Errors.NOT_COORDINATOR)
+    else if (isCoordinatorLoadInProgress(groupId))
+      Some(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+    else
+      None
+  }
+
   private def onGroupUnloaded(group: GroupMetadata) {
     group synchronized {
       info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
@@ -791,6 +815,8 @@ object GroupCoordinator {
   val NoProtocolType = ""
   val NoProtocol = ""
   val NoLeader = ""
+  val NoGeneration = -1
+  val NoMemberId = ""
   val NoMembers = List[MemberSummary]()
   val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
   val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e3092d2/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index e3a21d1..dd22a26 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -446,10 +446,8 @@ class Log(@volatile var dir: File,
     val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
     val completedTxns = ListBuffer.empty[CompletedTxn]
     records.batches.asScala.foreach { batch =>
-      if (batch.hasProducerId) {
-        val lastEntry = producerStateManager.lastEntry(batch.producerId)
-        updateProducers(batch, loadedProducers, completedTxns, lastEntry, loadingFromLog = true)
-      }
+      if (batch.hasProducerId)
+        updateProducers(batch, loadedProducers, completedTxns, loadingFromLog = true)
     }
     loadedProducers.values.foreach(producerStateManager.update)
     completedTxns.foreach(producerStateManager.completeTxn)
@@ -695,7 +693,7 @@ class Log(@volatile var dir: File,
       // the last appended entry to the client.
       if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch)))
         return (updatedProducers, completedTxns.toList, maybeLastEntry)
-      updateProducers(batch, updatedProducers, completedTxns, maybeLastEntry, loadingFromLog = false)
+      updateProducers(batch, updatedProducers, completedTxns, loadingFromLog = false)
     }
     (updatedProducers, completedTxns.toList, None)
   }
@@ -780,12 +778,10 @@ class Log(@volatile var dir: File,
   private def updateProducers(batch: RecordBatch,
                               producers: mutable.Map[Long, ProducerAppendInfo],
                               completedTxns: ListBuffer[CompletedTxn],
-                              lastEntry: Option[ProducerIdEntry],
                               loadingFromLog: Boolean): Unit = {
     val producerId = batch.producerId
-    val appendInfo = producers.getOrElseUpdate(producerId, new ProducerAppendInfo(producerId, lastEntry, loadingFromLog))
-    val shouldValidateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME
-    val maybeCompletedTxn = appendInfo.append(batch, shouldValidateSequenceNumbers)
+    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, loadingFromLog))
+    val maybeCompletedTxn = appendInfo.append(batch)
     maybeCompletedTxn.foreach(completedTxns += _)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e3092d2/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index cf3ef0e..70269bb 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -147,8 +147,7 @@ class LogSegment(val log: FileRecords,
   private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = {
     if (batch.hasProducerId) {
       val producerId = batch.producerId
-      val lastEntry = producerStateManager.lastEntry(producerId)
-      val appendInfo = new ProducerAppendInfo(batch.producerId, lastEntry, loadingFromLog = true)
+      val appendInfo = producerStateManager.prepareUpdate(producerId, loadingFromLog = true)
       val maybeCompletedTxn = appendInfo.append(batch)
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e3092d2/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index ba7c470..5ec91ce 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -26,6 +26,7 @@ import kafka.server.LogOffsetMetadata
 import kafka.utils.{Logging, nonthreadsafe, threadsafe}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch}
 import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
@@ -62,8 +63,24 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short,
  * It is initialized with the producer's state after the last successful append, and transitively validates the
  * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata
  * as the incoming records are validated.
+ *
+ * @param producerId The id of the producer appending to the log
+ * @param initialEntry The last entry associated with the producer id. Validation of the first append will be
+ *                     based off of this entry initially
+ * @param validateSequenceNumbers Whether or not sequence numbers should be validated. The only current use
+ *                                of this is the consumer offsets topic which uses producer ids from incoming
+ *                                TxnOffsetCommit, but has no sequence number to validate and does not depend
+ *                                on the deduplication which sequence numbers provide.
+ * @param loadingFromLog This parameter indicates whether the new append is being loaded directly from the log.
+ *                       This is used to repopulate producer state when the broker is initialized. The only
+ *                       difference in behavior is that we do not validate the sequence number of the first append
+ *                       since we may have lost previous sequence numbers when segments were removed due to log
+ *                       retention enforcement.
  */
-private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: ProducerIdEntry, loadingFromLog: Boolean = false) {
+private[log] class ProducerAppendInfo(val producerId: Long,
+                                      initialEntry: ProducerIdEntry,
+                                      validateSequenceNumbers: Boolean,
+                                      loadingFromLog: Boolean) {
   private var producerEpoch = initialEntry.producerEpoch
   private var firstSeq = initialEntry.firstSeq
   private var lastSeq = initialEntry.lastSeq
@@ -73,14 +90,11 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
   private var coordinatorEpoch = initialEntry.coordinatorEpoch
   private val transactions = ListBuffer.empty[TxnMetadata]
 
-  def this(producerId: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) =
-    this(producerId, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog)
-
-  private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int, shouldValidateSequenceNumbers: Boolean) = {
+  private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = {
     if (this.producerEpoch > producerEpoch) {
       throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " +
         s"with a newer epoch. $producerEpoch (request epoch), ${this.producerEpoch} (server epoch)")
-    } else if (shouldValidateSequenceNumbers) {
+    } else if (validateSequenceNumbers) {
       if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch < producerEpoch) {
         if (firstSeq != 0)
           throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
@@ -100,7 +114,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
     }
   }
 
-  def append(batch: RecordBatch, shouldValidateSequenceNumbers: Boolean = true): Option[CompletedTxn] = {
+  def append(batch: RecordBatch): Option[CompletedTxn] = {
     if (batch.isControlBatch) {
       val record = batch.iterator.next()
       val endTxnMarker = EndTransactionMarker.deserialize(record)
@@ -108,7 +122,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
       Some(completedTxn)
     } else {
       append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset,
-        batch.isTransactional, shouldValidateSequenceNumbers)
+        batch.isTransactional)
       None
     }
   }
@@ -118,12 +132,11 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
              lastSeq: Int,
              lastTimestamp: Long,
              lastOffset: Long,
-             isTransactional: Boolean,
-             shouldValidateSequenceNumbers: Boolean): Unit = {
+             isTransactional: Boolean): Unit = {
     if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog)
       // skip validation if this is the first entry when loading from the log. Log retention
       // will generally have removed the beginning entries from each producer id
-      validateAppend(epoch, firstSeq, lastSeq, shouldValidateSequenceNumbers)
+      validateAppend(epoch, firstSeq, lastSeq)
 
     this.producerEpoch = epoch
     this.firstSeq = firstSeq
@@ -325,6 +338,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   import ProducerStateManager._
   import java.util
 
+  private val validateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME
   private val producers = mutable.Map.empty[Long, ProducerIdEntry]
   private var lastMapOffset = 0L
   private var lastSnapOffset = 0L
@@ -448,6 +462,10 @@ class ProducerStateManager(val topicPartition: TopicPartition,
     }
   }
 
+  def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo =
+    new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.Empty), validateSequenceNumbers,
+      loadingFromLog)
+
   /**
    * Update the mapping with the given append information
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e3092d2/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5e9cd9f..31680b0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -39,7 +39,7 @@ import kafka.security.auth._
 import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.internals.Topic.{isInternal, GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
+import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
@@ -1454,17 +1454,21 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
       errors.put(producerId, responseStatus.mapValues(_.error).asJava)
 
-      val successfulPartitions = responseStatus.filter { case (_, partitionResponse) =>
-        partitionResponse.error == Errors.NONE
-      }.keys.toSeq
+      val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse) =>
+        topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionResponse.error == Errors.NONE
+      }.keys
 
-      try {
-        groupCoordinator.handleTxnCompletion(producerId = producerId, topicPartitions = successfulPartitions, transactionResult = result)
-      } catch {
-        case e: Exception =>
-          error(s"Received an exception while trying to update the offsets cache on transaction completion: $e")
-          val producerIdErrors = errors.get(producerId)
-          successfulPartitions.foreach(producerIdErrors.put(_, Errors.UNKNOWN))
+      if (successfulOffsetsPartitions.nonEmpty) {
+        // as soon as the end transaction marker has been written for a transactional offset commit,
+        // call to the group coordinator to materialize the offsets into the cache
+        try {
+          groupCoordinator.handleTxnCompletion(producerId, successfulOffsetsPartitions, result)
+        } catch {
+          case e: Exception =>
+            error(s"Received an exception while trying to update the offsets cache on transaction marker append", e)
+            val partitionErrors = errors.get(producerId)
+            successfulOffsetsPartitions.foreach(partitionErrors.put(_, Errors.UNKNOWN))
+        }
       }
 
       if (numAppends.decrementAndGet() == 0)
@@ -1597,8 +1601,9 @@ class KafkaApis(val requestChannel: RequestChannel,
           val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
           val exists = metadataCache.contains(topicPartition.topic)
           if (!authorizedForDescribe && exists)
-              debug(s"Transaction Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-                s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION")
+              debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " +
+                s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning " +
+                s"${Errors.UNKNOWN_TOPIC_OR_PARTITION.name}")
           authorizedForDescribe && exists
       }
 
@@ -1607,7 +1612,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       // the callback for sending an offset commit response
-      def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) {
+      def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) {
         val combinedCommitStatus = commitStatus ++
           unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++
           nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -1615,7 +1620,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (isDebugEnabled)
           combinedCommitStatus.foreach { case (topicPartition, error) =>
             if (error != Errors.NONE) {
-              debug(s"TxnOffsetCommit request with correlation id ${header.correlationId} from client ${header.clientId} " +
+              debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " +
                 s"on partition $topicPartition failed due to ${error.exceptionName}")
             }
           }
@@ -1626,33 +1631,28 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (authorizedTopics.isEmpty)
         sendResponseCallback(Map.empty)
       else {
-        val offsetRetention = groupCoordinator.offsetConfig.offsetsRetentionMs
-
-        // commit timestamp is always set to now.
-        // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
-        val currentTimestamp = time.milliseconds
-        val defaultExpireTimestamp = offsetRetention + currentTimestamp
-        val partitionData = authorizedTopics.mapValues { partitionData =>
-          val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
-          new OffsetAndMetadata(
-            offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
-            commitTimestamp = currentTimestamp,
-            expireTimestamp = defaultExpireTimestamp
-          )
-        }
-
-        // call coordinator to handle commit offset
-        groupCoordinator.handleCommitOffsets(
+        val offsetMetadata = convertTxnOffsets(authorizedTopics)
+        groupCoordinator.handleTxnCommitOffsets(
           txnOffsetCommitRequest.consumerGroupId,
-          null,
-          -1,
-          partitionData,
-          sendResponseCallback,
           txnOffsetCommitRequest.producerId,
-          txnOffsetCommitRequest.producerEpoch)
+          txnOffsetCommitRequest.producerEpoch,
+          offsetMetadata,
+          sendResponseCallback)
       }
     }
+  }
 
+  private def convertTxnOffsets(offsetsMap: immutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
+    val offsetRetention = groupCoordinator.offsetConfig.offsetsRetentionMs
+    val currentTimestamp = time.milliseconds
+    val defaultExpireTimestamp = offsetRetention + currentTimestamp
+    offsetsMap.map { case (topicPartition, partitionData) =>
+      val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
+      topicPartition -> new OffsetAndMetadata(
+        offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
+        commitTimestamp = currentTimestamp,
+        expireTimestamp = defaultExpireTimestamp)
+    }
   }
 
   def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e3092d2/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
index 0ace2e7..efa0a3b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
@@ -39,9 +39,6 @@ import scala.collection._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future, Promise, TimeoutException}
 
-/**
- * Test GroupCoordinator responses
- */
 class GroupCoordinatorResponseTest extends JUnitSuite {
   type JoinGroupCallback = JoinGroupResult => Unit
   type SyncGroupCallbackParams = (Array[Byte], Errors)
@@ -116,6 +113,15 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   }
 
   @Test
+  def testOffsetsRetentionMsIntegerOverflow() {
+    val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+    props.setProperty(KafkaConfig.OffsetsRetentionMinutesProp, Integer.MAX_VALUE.toString)
+    val config = KafkaConfig.fromProps(props)
+    val offsetConfig = GroupCoordinator.offsetConfig(config)
+    assertEquals(offsetConfig.offsetsRetentionMs, Integer.MAX_VALUE * 60L * 1000L)
+  }
+
+  @Test
   def testJoinGroupWrongCoordinator() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
@@ -1462,7 +1468,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V2)).anyTimes()
     EasyMock.replay(replicaManager)
 
-    groupCoordinator.handleCommitOffsets(groupId, null, -1, offsets, responseCallback, producerId, producerEpoch)
+    groupCoordinator.handleTxnCommitOffsets(groupId, producerId, producerEpoch, offsets, responseCallback)
     val result = Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
     EasyMock.reset(replicaManager)
     result

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e3092d2/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
deleted file mode 100644
index fd981b2..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.coordinator.group
-
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.junit.Assert._
-import org.junit.Test
-
-class GroupCoordinatorTest {
-
-  @Test
-  def testOffsetsRetentionMsIntegerOverflow() {
-    val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
-    props.setProperty(KafkaConfig.OffsetsRetentionMinutesProp, Integer.MAX_VALUE.toString)
-    val config = KafkaConfig.fromProps(props)
-    val offsetConfig = GroupCoordinator.offsetConfig(config)
-    assertEquals(offsetConfig.offsetsRetentionMs, Integer.MAX_VALUE * 60L * 1000L)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e3092d2/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 8c330ed..99ebd15 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -19,6 +19,7 @@ package kafka.log
 
 import java.io._
 import java.nio.ByteBuffer
+import java.nio.file.Files
 import java.util.Properties
 
 import org.apache.kafka.common.errors._
@@ -2270,7 +2271,7 @@ class LogTest {
   }
 
   @Test
-  def testRecoverTransactionIndex(): Unit = {
+  def testFullTransactionIndexRecovery(): Unit = {
     val log = createLog(128)
     val epoch = 0.toShort
 
@@ -2405,7 +2406,7 @@ class LogTest {
 
     // delete all snapshot files
     logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file =>
-      file.delete()
+      Files.delete(file.toPath)
     }
 
     // delete the last offset and transaction index files to force recovery. this should force us to rebuild
@@ -2589,7 +2590,7 @@ class LogTest {
     new Log(logDir,
       config,
       logStartOffset = 0L,
-      recoveryPoint = 0L,
+      recoveryPoint = recoveryPoint,
       scheduler = time.scheduler,
       brokerTopicStats = brokerTopicStats,
       time = time,

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e3092d2/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index ad26339..7227671 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -31,22 +31,22 @@ import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
 
 class ProducerStateManagerTest extends JUnitSuite {
-  var idMappingDir: File = null
-  var idMapping: ProducerStateManager = null
+  var logDir: File = null
+  var stateManager: ProducerStateManager = null
   val partition = new TopicPartition("test", 0)
-  val pid = 1L
+  val producerId = 1L
   val maxPidExpirationMs = 60 * 1000
   val time = new MockTime
 
   @Before
   def setUp(): Unit = {
-    idMappingDir = TestUtils.tempDir()
-    idMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs)
+    logDir = TestUtils.tempDir()
+    stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
   }
 
   @After
   def tearDown(): Unit = {
-    Utils.delete(idMappingDir)
+    Utils.delete(logDir)
   }
 
   @Test
@@ -54,27 +54,27 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 0.toShort
 
     // First entry for id 0 added
-    append(idMapping, pid, 0, epoch, 0L, 0L)
+    append(stateManager, producerId, epoch, 0, 0L, 0L)
 
     // Second entry for id 0 added
-    append(idMapping, pid, 1, epoch, 0L, 1L)
+    append(stateManager, producerId, epoch, 1, 0L, 1L)
 
     // Duplicate sequence number (matches previous sequence number)
     assertThrows[DuplicateSequenceNumberException] {
-      append(idMapping, pid, 1, epoch, 0L, 1L)
+      append(stateManager, producerId, epoch, 1, 0L, 1L)
     }
 
     // Invalid sequence number (greater than next expected sequence number)
     assertThrows[OutOfOrderSequenceException] {
-      append(idMapping, pid, 5, epoch, 0L, 2L)
+      append(stateManager, producerId, epoch, 5, 0L, 2L)
     }
 
     // Change epoch
-    append(idMapping, pid, 0, (epoch + 1).toShort, 0L, 3L)
+    append(stateManager, producerId, (epoch + 1).toShort, 0, 0L, 3L)
 
     // Incorrect epoch
     assertThrows[ProducerFencedException] {
-      append(idMapping, pid, 0, epoch, 0L, 4L)
+      append(stateManager, producerId, epoch, 0, 0L, 4L)
     }
   }
 
@@ -83,9 +83,9 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 5.toShort
     val sequence = 16
     val offset = 735L
-    append(idMapping, pid, sequence, epoch, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
 
-    val maybeLastEntry = idMapping.lastEntry(pid)
+    val maybeLastEntry = stateManager.lastEntry(producerId)
     assertTrue(maybeLastEntry.isDefined)
 
     val lastEntry = maybeLastEntry.get
@@ -99,17 +99,17 @@ class ProducerStateManagerTest extends JUnitSuite {
   @Test
   def testControlRecordBumpsEpoch(): Unit = {
     val epoch = 0.toShort
-    append(idMapping, pid, 0, epoch, 0L)
+    append(stateManager, producerId, epoch, 0, 0L)
 
     val bumpedEpoch = 1.toShort
-    val (completedTxn, lastStableOffset) = appendEndTxnMarker(idMapping, pid, bumpedEpoch, ControlRecordType.ABORT, 1L)
+    val (completedTxn, lastStableOffset) = appendEndTxnMarker(stateManager, producerId, bumpedEpoch, ControlRecordType.ABORT, 1L)
     assertEquals(1L, completedTxn.firstOffset)
     assertEquals(1L, completedTxn.lastOffset)
     assertEquals(2L, lastStableOffset)
     assertTrue(completedTxn.isAborted)
-    assertEquals(pid, completedTxn.producerId)
+    assertEquals(producerId, completedTxn.producerId)
 
-    val maybeLastEntry = idMapping.lastEntry(pid)
+    val maybeLastEntry = stateManager.lastEntry(producerId)
     assertTrue(maybeLastEntry.isDefined)
 
     val lastEntry = maybeLastEntry.get
@@ -119,8 +119,8 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq)
 
     // should be able to append with the new epoch if we start at sequence 0
-    append(idMapping, pid, 0, bumpedEpoch, 2L)
-    assertEquals(Some(0), idMapping.lastEntry(pid).map(_.firstSeq))
+    append(stateManager, producerId, bumpedEpoch, 0, 2L)
+    assertEquals(Some(0), stateManager.lastEntry(producerId).map(_.firstSeq))
   }
 
   @Test
@@ -128,16 +128,16 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(pid, None, false)
-    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true,
-      shouldValidateSequenceNumbers = true)
+    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers = true,
+      loadingFromLog = false)
+    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true)
 
     val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
       relativePositionInSegment = 234224)
     producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
-    idMapping.update(producerAppendInfo)
+    stateManager.update(producerAppendInfo)
 
-    assertEquals(Some(logOffsetMetadata), idMapping.firstUnstableOffset)
+    assertEquals(Some(logOffsetMetadata), stateManager.firstUnstableOffset)
   }
 
   @Test
@@ -145,18 +145,18 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(pid, None, false)
-    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true,
-      shouldValidateSequenceNumbers = true)
+    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers = true,
+      loadingFromLog = false)
+    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true)
 
     // use some other offset to simulate a follower append where the log offset metadata won't typically
     // match any of the transaction first offsets
     val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset - 23429, segmentBaseOffset = 990000L,
       relativePositionInSegment = 234224)
     producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
-    idMapping.update(producerAppendInfo)
+    stateManager.update(producerAppendInfo)
 
-    assertEquals(Some(LogOffsetMetadata(offset)), idMapping.firstUnstableOffset)
+    assertEquals(Some(LogOffsetMetadata(offset)), stateManager.firstUnstableOffset)
   }
 
   @Test
@@ -164,11 +164,10 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val coordinatorEpoch = 15
     val offset = 9L
-    append(idMapping, pid, 0, producerEpoch, offset)
+    append(stateManager, producerId, producerEpoch, 0, offset)
 
-    val appendInfo = new ProducerAppendInfo(pid, idMapping.lastEntry(pid), loadingFromLog = false)
-    appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true,
-      shouldValidateSequenceNumbers = true)
+    val appendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false)
+    appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true)
     var lastEntry = appendInfo.lastEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
@@ -176,10 +175,9 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertEquals(16L, lastEntry.firstOffset)
     assertEquals(20L, lastEntry.lastOffset)
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
-    assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions)
+    assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
-    appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 30L, isTransactional = true,
-      shouldValidateSequenceNumbers = true)
+    appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 30L, isTransactional = true)
     lastEntry = appendInfo.lastEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(6, lastEntry.firstSeq)
@@ -187,11 +185,11 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertEquals(26L, lastEntry.firstOffset)
     assertEquals(30L, lastEntry.lastOffset)
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
-    assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions)
+    assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
     val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
     val completedTxn = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds())
-    assertEquals(pid, completedTxn.producerId)
+    assertEquals(producerId, completedTxn.producerId)
     assertEquals(16L, completedTxn.firstOffset)
     assertEquals(40L, completedTxn.lastOffset)
     assertFalse(completedTxn.isAborted)
@@ -204,112 +202,112 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertEquals(40L, lastEntry.lastOffset)
     assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch)
     assertEquals(None, lastEntry.currentTxnFirstOffset)
-    assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions)
+    assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
   }
 
   @Test(expected = classOf[OutOfOrderSequenceException])
   def testOutOfSequenceAfterControlRecordEpochBump(): Unit = {
     val epoch = 0.toShort
-    append(idMapping, pid, 0, epoch, 0L)
-    append(idMapping, pid, 1, epoch, 1L)
+    append(stateManager, producerId, epoch, 0, 0L)
+    append(stateManager, producerId, epoch, 1, 1L)
 
     val bumpedEpoch = 1.toShort
-    appendEndTxnMarker(idMapping, pid, bumpedEpoch, ControlRecordType.ABORT, 1L)
+    appendEndTxnMarker(stateManager, producerId, bumpedEpoch, ControlRecordType.ABORT, 1L)
 
     // next append is invalid since we expect the sequence to be reset
-    append(idMapping, pid, 2, bumpedEpoch, 2L)
+    append(stateManager, producerId, bumpedEpoch, 2, 2L)
   }
 
   @Test(expected = classOf[InvalidTxnStateException])
   def testNonTransactionalAppendWithOngoingTransaction(): Unit = {
     val epoch = 0.toShort
-    append(idMapping, pid, 0, epoch, 0L, isTransactional = true)
-    append(idMapping, pid, 1, epoch, 1L, isTransactional = false)
+    append(stateManager, producerId, epoch, 0, 0L, isTransactional = true)
+    append(stateManager, producerId, epoch, 1, 1L, isTransactional = false)
   }
 
   @Test
   def testTruncateAndReloadRemovesOutOfRangeSnapshots(): Unit = {
     val epoch = 0.toShort
-    append(idMapping, pid, epoch, 0, 0L)
-    idMapping.takeSnapshot()
-    append(idMapping, pid, epoch, 1, 1L)
-    idMapping.takeSnapshot()
-    append(idMapping, pid, epoch, 2, 2L)
-    idMapping.takeSnapshot()
-    append(idMapping, pid, epoch, 3, 3L)
-    idMapping.takeSnapshot()
-    append(idMapping, pid, epoch, 4, 4L)
-    idMapping.takeSnapshot()
-
-    idMapping.truncateAndReload(1L, 3L, time.milliseconds())
-
-    assertEquals(Some(2L), idMapping.oldestSnapshotOffset)
-    assertEquals(Some(3L), idMapping.latestSnapshotOffset)
+    append(stateManager, producerId, epoch, 0, 0L)
+    stateManager.takeSnapshot()
+    append(stateManager, producerId, epoch, 1, 1L)
+    stateManager.takeSnapshot()
+    append(stateManager, producerId, epoch, 2, 2L)
+    stateManager.takeSnapshot()
+    append(stateManager, producerId, epoch, 3, 3L)
+    stateManager.takeSnapshot()
+    append(stateManager, producerId, epoch, 4, 4L)
+    stateManager.takeSnapshot()
+
+    stateManager.truncateAndReload(1L, 3L, time.milliseconds())
+
+    assertEquals(Some(2L), stateManager.oldestSnapshotOffset)
+    assertEquals(Some(3L), stateManager.latestSnapshotOffset)
   }
 
   @Test
   def testTakeSnapshot(): Unit = {
     val epoch = 0.toShort
-    append(idMapping, pid, 0, epoch, 0L, 0L)
-    append(idMapping, pid, 1, epoch, 1L, 1L)
+    append(stateManager, producerId, epoch, 0, 0L, 0L)
+    append(stateManager, producerId, epoch, 1, 1L, 1L)
 
     // Take snapshot
-    idMapping.takeSnapshot()
+    stateManager.takeSnapshot()
 
     // Check that file exists and it is not empty
-    assertEquals("Directory doesn't contain a single file as expected", 1, idMappingDir.list().length)
-    assertTrue("Snapshot file is empty", idMappingDir.list().head.length > 0)
+    assertEquals("Directory doesn't contain a single file as expected", 1, logDir.list().length)
+    assertTrue("Snapshot file is empty", logDir.list().head.length > 0)
   }
 
   @Test
   def testRecoverFromSnapshot(): Unit = {
     val epoch = 0.toShort
-    append(idMapping, pid, 0, epoch, 0L)
-    append(idMapping, pid, 1, epoch, 1L)
+    append(stateManager, producerId, epoch, 0, 0L)
+    append(stateManager, producerId, epoch, 1, 1L)
 
-    idMapping.takeSnapshot()
-    val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs)
+    stateManager.takeSnapshot()
+    val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
     recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds)
 
     // entry added after recovery
-    append(recoveredMapping, pid, 2, epoch, 2L)
+    append(recoveredMapping, producerId, epoch, 2, 2L)
   }
 
   @Test(expected = classOf[OutOfOrderSequenceException])
   def testRemoveExpiredPidsOnReload(): Unit = {
     val epoch = 0.toShort
-    append(idMapping, pid, 0, epoch, 0L, 0)
-    append(idMapping, pid, 1, epoch, 1L, 1)
+    append(stateManager, producerId, epoch, 0, 0L, 0)
+    append(stateManager, producerId, epoch, 1, 1L, 1)
 
-    idMapping.takeSnapshot()
-    val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs)
+    stateManager.takeSnapshot()
+    val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
     recoveredMapping.truncateAndReload(0L, 1L, 70000)
 
     // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence
     // we should get an out of order sequence exception.
-    append(recoveredMapping, pid, 2, epoch, 2L, 70001)
+    append(recoveredMapping, producerId, epoch, 2, 2L, 70001)
   }
 
   @Test
   def testDeleteSnapshotsBefore(): Unit = {
     val epoch = 0.toShort
-    append(idMapping, pid, 0, epoch, 0L)
-    append(idMapping, pid, 1, epoch, 1L)
-    idMapping.takeSnapshot()
-    assertEquals(1, idMappingDir.listFiles().length)
+    append(stateManager, producerId, epoch, 0, 0L)
+    append(stateManager, producerId, epoch, 1, 1L)
+    stateManager.takeSnapshot()
+    assertEquals(1, logDir.listFiles().length)
     assertEquals(Set(2), currentSnapshotOffsets)
 
-    append(idMapping, pid, 2, epoch, 2L)
-    idMapping.takeSnapshot()
-    assertEquals(2, idMappingDir.listFiles().length)
+    append(stateManager, producerId, epoch, 2, 2L)
+    stateManager.takeSnapshot()
+    assertEquals(2, logDir.listFiles().length)
     assertEquals(Set(2, 3), currentSnapshotOffsets)
 
-    idMapping.deleteSnapshotsBefore(3L)
-    assertEquals(1, idMappingDir.listFiles().length)
+    stateManager.deleteSnapshotsBefore(3L)
+    assertEquals(1, logDir.listFiles().length)
     assertEquals(Set(3), currentSnapshotOffsets)
 
-    idMapping.deleteSnapshotsBefore(4L)
-    assertEquals(0, idMappingDir.listFiles().length)
+    stateManager.deleteSnapshotsBefore(4L)
+    assertEquals(0, logDir.listFiles().length)
     assertEquals(Set(), currentSnapshotOffsets)
   }
 
@@ -317,25 +315,25 @@ class ProducerStateManagerTest extends JUnitSuite {
   def testTruncate(): Unit = {
     val epoch = 0.toShort
 
-    append(idMapping, pid, 0, epoch, 0L)
-    append(idMapping, pid, 1, epoch, 1L)
-    idMapping.takeSnapshot()
-    assertEquals(1, idMappingDir.listFiles().length)
+    append(stateManager, producerId, epoch, 0, 0L)
+    append(stateManager, producerId, epoch, 1, 1L)
+    stateManager.takeSnapshot()
+    assertEquals(1, logDir.listFiles().length)
     assertEquals(Set(2), currentSnapshotOffsets)
 
-    append(idMapping, pid, 2, epoch, 2L)
-    idMapping.takeSnapshot()
-    assertEquals(2, idMappingDir.listFiles().length)
+    append(stateManager, producerId, epoch, 2, 2L)
+    stateManager.takeSnapshot()
+    assertEquals(2, logDir.listFiles().length)
     assertEquals(Set(2, 3), currentSnapshotOffsets)
 
-    idMapping.truncate()
+    stateManager.truncate()
 
-    assertEquals(0, idMappingDir.listFiles().length)
+    assertEquals(0, logDir.listFiles().length)
     assertEquals(Set(), currentSnapshotOffsets)
 
-    append(idMapping, pid, 0, epoch, 0L)
-    idMapping.takeSnapshot()
-    assertEquals(1, idMappingDir.listFiles().length)
+    append(stateManager, producerId, epoch, 0, 0L)
+    stateManager.takeSnapshot()
+    assertEquals(1, logDir.listFiles().length)
     assertEquals(Set(1), currentSnapshotOffsets)
   }
 
@@ -344,80 +342,80 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 0.toShort
     val sequence = 0
 
-    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
-    assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset))
-    idMapping.takeSnapshot()
+    append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true)
+    assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset))
+    stateManager.takeSnapshot()
 
-    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 105)
-    idMapping.onHighWatermarkUpdated(106)
-    assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset))
-    idMapping.takeSnapshot()
+    appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 105)
+    stateManager.onHighWatermarkUpdated(106)
+    assertEquals(None, stateManager.firstUnstableOffset.map(_.messageOffset))
+    stateManager.takeSnapshot()
 
-    append(idMapping, pid, sequence + 1, epoch, offset = 106)
-    idMapping.truncateAndReload(0L, 106, time.milliseconds())
-    assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset))
+    append(stateManager, producerId, epoch, sequence + 1, offset = 106)
+    stateManager.truncateAndReload(0L, 106, time.milliseconds())
+    assertEquals(None, stateManager.firstUnstableOffset.map(_.messageOffset))
 
-    idMapping.truncateAndReload(0L, 100L, time.milliseconds())
-    assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset))
+    stateManager.truncateAndReload(0L, 100L, time.milliseconds())
+    assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset))
   }
 
   @Test
   def testFirstUnstableOffsetAfterEviction(): Unit = {
     val epoch = 0.toShort
     val sequence = 0
-    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
-    assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset))
-    append(idMapping, 2L, 0, epoch, offset = 106, isTransactional = true)
-    idMapping.evictUnretainedProducers(100)
-    assertEquals(Some(106), idMapping.firstUnstableOffset.map(_.messageOffset))
+    append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true)
+    assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset))
+    append(stateManager, 2L, epoch, 0, offset = 106, isTransactional = true)
+    stateManager.evictUnretainedProducers(100)
+    assertEquals(Some(106), stateManager.firstUnstableOffset.map(_.messageOffset))
   }
 
   @Test
   def testEvictUnretainedPids(): Unit = {
     val epoch = 0.toShort
 
-    append(idMapping, pid, 0, epoch, 0L)
-    append(idMapping, pid, 1, epoch, 1L)
-    idMapping.takeSnapshot()
+    append(stateManager, producerId, epoch, 0, 0L)
+    append(stateManager, producerId, epoch, 1, 1L)
+    stateManager.takeSnapshot()
 
     val anotherPid = 2L
-    append(idMapping, anotherPid, 0, epoch, 2L)
-    append(idMapping, anotherPid, 1, epoch, 3L)
-    idMapping.takeSnapshot()
+    append(stateManager, anotherPid, epoch, 0, 2L)
+    append(stateManager, anotherPid, epoch, 1, 3L)
+    stateManager.takeSnapshot()
     assertEquals(Set(2, 4), currentSnapshotOffsets)
 
-    idMapping.evictUnretainedProducers(2)
+    stateManager.evictUnretainedProducers(2)
     assertEquals(Set(4), currentSnapshotOffsets)
-    assertEquals(Set(anotherPid), idMapping.activeProducers.keySet)
-    assertEquals(None, idMapping.lastEntry(pid))
+    assertEquals(Set(anotherPid), stateManager.activeProducers.keySet)
+    assertEquals(None, stateManager.lastEntry(producerId))
 
-    val maybeEntry = idMapping.lastEntry(anotherPid)
+    val maybeEntry = stateManager.lastEntry(anotherPid)
     assertTrue(maybeEntry.isDefined)
     assertEquals(3L, maybeEntry.get.lastOffset)
 
-    idMapping.evictUnretainedProducers(3)
-    assertEquals(Set(anotherPid), idMapping.activeProducers.keySet)
+    stateManager.evictUnretainedProducers(3)
+    assertEquals(Set(anotherPid), stateManager.activeProducers.keySet)
     assertEquals(Set(4), currentSnapshotOffsets)
-    assertEquals(4, idMapping.mapEndOffset)
+    assertEquals(4, stateManager.mapEndOffset)
 
-    idMapping.evictUnretainedProducers(5)
-    assertEquals(Set(), idMapping.activeProducers.keySet)
+    stateManager.evictUnretainedProducers(5)
+    assertEquals(Set(), stateManager.activeProducers.keySet)
     assertEquals(Set(), currentSnapshotOffsets)
-    assertEquals(5, idMapping.mapEndOffset)
+    assertEquals(5, stateManager.mapEndOffset)
   }
 
   @Test
   def testSkipSnapshotIfOffsetUnchanged(): Unit = {
     val epoch = 0.toShort
-    append(idMapping, pid, 0, epoch, 0L, 0L)
+    append(stateManager, producerId, epoch, 0, 0L, 0L)
 
-    idMapping.takeSnapshot()
-    assertEquals(1, idMappingDir.listFiles().length)
+    stateManager.takeSnapshot()
+    assertEquals(1, logDir.listFiles().length)
     assertEquals(Set(1), currentSnapshotOffsets)
 
     // nothing changed so there should be no new snapshot
-    idMapping.takeSnapshot()
-    assertEquals(1, idMappingDir.listFiles().length)
+    stateManager.takeSnapshot()
+    assertEquals(1, logDir.listFiles().length)
     assertEquals(Set(1), currentSnapshotOffsets)
   }
 
@@ -425,16 +423,16 @@ class ProducerStateManagerTest extends JUnitSuite {
   def testStartOffset(): Unit = {
     val epoch = 0.toShort
     val pid2 = 2L
-    append(idMapping, pid2, 0, epoch, 0L, 1L)
-    append(idMapping, pid, 0, epoch, 1L, 2L)
-    append(idMapping, pid, 1, epoch, 2L, 3L)
-    append(idMapping, pid, 2, epoch, 3L, 4L)
-    idMapping.takeSnapshot()
+    append(stateManager, pid2, epoch, 0, 0L, 1L)
+    append(stateManager, producerId, epoch, 0, 1L, 2L)
+    append(stateManager, producerId, epoch, 1, 2L, 3L)
+    append(stateManager, producerId, epoch, 2, 3L, 4L)
+    stateManager.takeSnapshot()
 
     intercept[OutOfOrderSequenceException] {
-      val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs)
+      val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
       recoveredMapping.truncateAndReload(0L, 1L, time.milliseconds)
-      append(recoveredMapping, pid2, 1, epoch, 4L, 5L)
+      append(recoveredMapping, pid2, epoch, 1, 4L, 5L)
     }
   }
 
@@ -442,10 +440,10 @@ class ProducerStateManagerTest extends JUnitSuite {
   def testPidExpirationTimeout() {
     val epoch = 5.toShort
     val sequence = 37
-    append(idMapping, pid, sequence, epoch, 1L)
+    append(stateManager, producerId, epoch, sequence, 1L)
     time.sleep(maxPidExpirationMs + 1)
-    idMapping.removeExpiredProducers(time.milliseconds)
-    append(idMapping, pid, sequence + 1, epoch, 1L)
+    stateManager.removeExpiredProducers(time.milliseconds)
+    append(stateManager, producerId, epoch, sequence + 1, 1L)
   }
 
   @Test
@@ -453,33 +451,33 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 5.toShort
     val sequence = 0
 
-    assertEquals(None, idMapping.firstUndecidedOffset)
+    assertEquals(None, stateManager.firstUndecidedOffset)
 
-    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
-    assertEquals(Some(99L), idMapping.firstUndecidedOffset)
-    assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset))
+    append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true)
+    assertEquals(Some(99L), stateManager.firstUndecidedOffset)
+    assertEquals(Some(99L), stateManager.firstUnstableOffset.map(_.messageOffset))
 
     val anotherPid = 2L
-    append(idMapping, anotherPid, sequence, epoch, offset = 105, isTransactional = true)
-    assertEquals(Some(99L), idMapping.firstUndecidedOffset)
-    assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset))
+    append(stateManager, anotherPid, epoch, sequence, offset = 105, isTransactional = true)
+    assertEquals(Some(99L), stateManager.firstUndecidedOffset)
+    assertEquals(Some(99L), stateManager.firstUnstableOffset.map(_.messageOffset))
 
-    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 109)
-    assertEquals(Some(105L), idMapping.firstUndecidedOffset)
-    assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset))
+    appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 109)
+    assertEquals(Some(105L), stateManager.firstUndecidedOffset)
+    assertEquals(Some(99L), stateManager.firstUnstableOffset.map(_.messageOffset))
 
-    idMapping.onHighWatermarkUpdated(100L)
-    assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset))
+    stateManager.onHighWatermarkUpdated(100L)
+    assertEquals(Some(99L), stateManager.firstUnstableOffset.map(_.messageOffset))
 
-    idMapping.onHighWatermarkUpdated(110L)
-    assertEquals(Some(105L), idMapping.firstUnstableOffset.map(_.messageOffset))
+    stateManager.onHighWatermarkUpdated(110L)
+    assertEquals(Some(105L), stateManager.firstUnstableOffset.map(_.messageOffset))
 
-    appendEndTxnMarker(idMapping, anotherPid, epoch, ControlRecordType.ABORT, offset = 112)
-    assertEquals(None, idMapping.firstUndecidedOffset)
-    assertEquals(Some(105L), idMapping.firstUnstableOffset.map(_.messageOffset))
+    appendEndTxnMarker(stateManager, anotherPid, epoch, ControlRecordType.ABORT, offset = 112)
+    assertEquals(None, stateManager.firstUndecidedOffset)
+    assertEquals(Some(105L), stateManager.firstUnstableOffset.map(_.messageOffset))
 
-    idMapping.onHighWatermarkUpdated(113L)
-    assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset))
+    stateManager.onHighWatermarkUpdated(113L)
+    assertEquals(None, stateManager.firstUnstableOffset.map(_.messageOffset))
   }
 
   @Test
@@ -487,17 +485,28 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 5.toShort
     val sequence = 0
 
-    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
-    assertEquals(Some(99L), idMapping.firstUndecidedOffset)
+    append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true)
+    assertEquals(Some(99L), stateManager.firstUndecidedOffset)
 
     time.sleep(maxPidExpirationMs + 1)
-    idMapping.removeExpiredProducers(time.milliseconds)
+    stateManager.removeExpiredProducers(time.milliseconds)
 
-    assertTrue(idMapping.lastEntry(pid).isDefined)
-    assertEquals(Some(99L), idMapping.firstUndecidedOffset)
+    assertTrue(stateManager.lastEntry(producerId).isDefined)
+    assertEquals(Some(99L), stateManager.firstUndecidedOffset)
+
+    stateManager.removeExpiredProducers(time.milliseconds)
+    assertTrue(stateManager.lastEntry(producerId).isDefined)
+  }
+
+  @Test
+  def testSequenceNotValidatedForGroupMetadataTopic(): Unit = {
+    val partition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
+    val stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
+
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 99, isTransactional = true)
+    append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 100, isTransactional = true)
 
-    idMapping.removeExpiredProducers(time.milliseconds)
-    assertTrue(idMapping.lastEntry(pid).isDefined)
   }
 
   @Test(expected = classOf[ProducerFencedException])
@@ -505,10 +514,10 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 5.toShort
     val sequence = 0
 
-    assertEquals(None, idMapping.firstUndecidedOffset)
+    assertEquals(None, stateManager.firstUndecidedOffset)
 
-    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
-    appendEndTxnMarker(idMapping, pid, 3.toShort, ControlRecordType.COMMIT, offset=100)
+    append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true)
+    appendEndTxnMarker(stateManager, producerId, 3.toShort, ControlRecordType.COMMIT, offset=100)
   }
 
   @Test
@@ -516,21 +525,21 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 5.toShort
     val sequence = 0
 
-    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
-    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1)
+    append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true)
+    appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1)
 
-    val lastEntry = idMapping.lastEntry(pid)
+    val lastEntry = stateManager.lastEntry(producerId)
     assertEquals(Some(1), lastEntry.map(_.coordinatorEpoch))
 
     // writing with the current epoch is allowed
-    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 101, coordinatorEpoch = 1)
+    appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 101, coordinatorEpoch = 1)
 
     // bumping the epoch is allowed
-    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 102, coordinatorEpoch = 2)
+    appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 102, coordinatorEpoch = 2)
 
     // old epochs are not allowed
     try {
-      appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 103, coordinatorEpoch = 1)
+      appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 103, coordinatorEpoch = 1)
       fail("Expected coordinator to be fenced")
     } catch {
       case e: TransactionCoordinatorFencedException =>
@@ -539,49 +548,49 @@ class ProducerStateManagerTest extends JUnitSuite {
 
   @Test(expected = classOf[TransactionCoordinatorFencedException])
   def testCoordinatorFencedAfterReload(): Unit = {
-    val epoch = 0.toShort
-    append(idMapping, pid, 0, epoch, offset = 99, isTransactional = true)
-    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1)
-    idMapping.takeSnapshot()
+    val producerEpoch = 0.toShort
+    append(stateManager, producerId, producerEpoch, 0, offset = 99, isTransactional = true)
+    appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1)
+    stateManager.takeSnapshot()
 
-    val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs)
+    val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
     recoveredMapping.truncateAndReload(0L, 2L, 70000)
 
     // append from old coordinator should be rejected
-    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 0)
+    appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 0)
   }
 
   private def appendEndTxnMarker(mapping: ProducerStateManager,
-                                 pid: Long,
-                                 epoch: Short,
+                                 producerId: Long,
+                                 producerEpoch: Short,
                                  controlType: ControlRecordType,
                                  offset: Long,
                                  coordinatorEpoch: Int = 0,
                                  timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = {
-    val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid).getOrElse(ProducerIdEntry.Empty))
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false)
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
-    val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, epoch, offset, timestamp)
+    val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
     mapping.update(producerAppendInfo)
     val lastStableOffset = mapping.completeTxn(completedTxn)
     mapping.updateMapEndOffset(offset + 1)
     (completedTxn, lastStableOffset)
   }
 
-  private def append(mapping: ProducerStateManager,
-                     pid: Long,
+  private def append(stateManager: ProducerStateManager,
+                     producerId: Long,
+                     producerEpoch: Short,
                      seq: Int,
-                     epoch: Short,
                      offset: Long,
                      timestamp: Long = time.milliseconds(),
                      isTransactional: Boolean = false,
                      isLoadingFromLog: Boolean = false): Unit = {
-    val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid), isLoadingFromLog)
-    producerAppendInfo.append(epoch, seq, seq, timestamp, offset, isTransactional, shouldValidateSequenceNumbers = true)
-    mapping.update(producerAppendInfo)
-    mapping.updateMapEndOffset(offset + 1)
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isLoadingFromLog)
+    producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, isTransactional)
+    stateManager.update(producerAppendInfo)
+    stateManager.updateMapEndOffset(offset + 1)
   }
 
   private def currentSnapshotOffsets =
-    idMappingDir.listFiles().map(file => Log.offsetFromFilename(file.getName)).toSet
+    logDir.listFiles().map(file => Log.offsetFromFilename(file.getName)).toSet
 
 }


Mime
View raw message