kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: MINOR: GroupCoordinator can append with group lock
Date Thu, 25 May 2017 04:02:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 a8dbce47f -> 8dfc098ee


MINOR: GroupCoordinator can append with group lock

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3133 from hachikuji/minor-replica-manager-append-refactor

(cherry picked from commit fdcee8b8b3c027cdc1b13031fa19fcfc7de5609f)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8dfc098e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8dfc098e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8dfc098e

Branch: refs/heads/0.11.0
Commit: 8dfc098ee5c5024a580dab50281e19fec2dd6cbf
Parents: a8dbce4
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed May 24 21:00:44 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Wed May 24 21:02:42 2017 -0700

----------------------------------------------------------------------
 .../coordinator/group/GroupCoordinator.scala    |  25 +--
 .../kafka/coordinator/group/GroupMetadata.scala |   2 +-
 .../group/GroupMetadataManager.scala            |  39 ++--
 .../transaction/TransactionStateManager.scala   |   2 +-
 .../scala/kafka/server/DelayedProduce.scala     |  28 +--
 .../scala/kafka/server/ReplicaManager.scala     |  10 +-
 .../group/GroupCoordinatorTest.scala            |   7 +-
 .../group/GroupMetadataManagerTest.scala        | 222 +++++++++++++++----
 8 files changed, 218 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfc098e/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 36e3c63..7c1e002 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -245,8 +245,6 @@ class GroupCoordinator(val brokerId: Int,
                           memberId: String,
                           groupAssignment: Map[String, Array[Byte]],
                           responseCallback: SyncCallback) {
-    var delayedGroupStore: Option[DelayedStore] = None
-
     group synchronized {
       if (!group.has(memberId)) {
         responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
@@ -271,7 +269,7 @@ class GroupCoordinator(val brokerId: Int,
               val missing = group.allMembers -- groupAssignment.keySet
               val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
 
-              delayedGroupStore = groupManager.prepareStoreGroup(group, assignment, (error:
Errors) => {
+              groupManager.storeGroup(group, assignment, (error: Errors) => {
                 group synchronized {
                   // another member may have joined the group while we were awaiting this
callback,
                   // so we must ensure we are still in the AwaitingSync state and the same
generation
@@ -297,11 +295,6 @@ class GroupCoordinator(val brokerId: Int,
         }
       }
     }
-
-    // store the group metadata without holding the group lock to avoid the potential
-    // for deadlock if the callback is invoked holding other locks (e.g. the replica
-    // state change lock)
-    delayedGroupStore.foreach(groupManager.store)
   }
 
   def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors =>
Unit) {
@@ -452,15 +445,13 @@ class GroupCoordinator(val brokerId: Int,
                               producerEpoch: Short,
                               offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                               responseCallback: immutable.Map[TopicPartition, Errors] =>
Unit) {
-    var delayedOffsetStore: Option[DelayedStore] = None
     group synchronized {
       if (group.is(Dead)) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
       } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID))
{
         // the group is only using Kafka to store offsets
         // Also, for transactional offset commits we don't need to validate group membership
and the generation.
-        delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
-          offsetMetadata, responseCallback, producerId, producerEpoch)
+        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId,
producerEpoch)
       } else if (group.is(AwaitingSync)) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
       } else if (!group.has(memberId)) {
@@ -470,13 +461,9 @@ class GroupCoordinator(val brokerId: Int,
       } else {
         val member = group.get(memberId)
         completeAndScheduleNextHeartbeatExpiration(group, member)
-        delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
-          offsetMetadata, responseCallback)
+        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
       }
     }
-
-    // store the offsets without holding the group lock
-    delayedOffsetStore.foreach(groupManager.store)
   }
 
   def handleFetchOffsets(groupId: String,
@@ -727,7 +714,6 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def onCompleteJoin(group: GroupMetadata) {
-    var delayedStore: Option[DelayedStore] = None
     group synchronized {
       // remove any members who haven't joined the group yet
       group.notYetRejoinedMembers.foreach { failedMember =>
@@ -740,7 +726,7 @@ class GroupCoordinator(val brokerId: Int,
         if (group.is(Empty)) {
           info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
 
-          delayedStore = groupManager.prepareStoreGroup(group, Map.empty, error => {
+          groupManager.storeGroup(group, Map.empty, error => {
             if (error != Errors.NONE) {
               // we failed to write the empty group metadata. If the broker fails before
another rebalance,
               // the previous generation written to the log will become active again (and
most likely timeout).
@@ -773,9 +759,6 @@ class GroupCoordinator(val brokerId: Int,
         }
       }
     }
-
-    // call without holding the group lock
-    delayedStore.foreach(groupManager.store)
   }
 
   def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline:
Long, forceComplete: () => Boolean) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfc098e/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 7bde4e2..35a1fc7 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -427,7 +427,7 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
     (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
   }.toMap
 
-  def offset(topicPartition: TopicPartition) : Option[OffsetAndMetadata] = offsets.get(topicPartition).map(_.offsetAndMetadata)
+  def offset(topicPartition: TopicPartition): Option[OffsetAndMetadata] = offsets.get(topicPartition).map(_.offsetAndMetadata)
 
   // visible for testing
   private[group] def offsetWithRecordMetadata(topicPartition: TopicPartition): Option[CommitRecordMetadataAndOffset]
= offsets.get(topicPartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfc098e/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 8e5135d..b23514a 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -100,7 +100,7 @@ class GroupMetadataManager(brokerId: Int,
     scheduler.startup()
 
     scheduler.schedule(name = "delete-expired-group-metadata",
-      fun = cleanupGroupMetadata _,
+      fun = cleanupGroupMetadata,
       period = config.offsetsRetentionCheckIntervalMs,
       unit = TimeUnit.MILLISECONDS)
   }
@@ -145,9 +145,9 @@ class GroupMetadataManager(brokerId: Int,
     }
   }
 
-  def prepareStoreGroup(group: GroupMetadata,
-                        groupAssignment: Map[String, Array[Byte]],
-                        responseCallback: Errors => Unit): Option[DelayedStore] = {
+  def storeGroup(group: GroupMetadata,
+                 groupAssignment: Map[String, Array[Byte]],
+                 responseCallback: Errors => Unit): Unit = {
     getMagic(partitionFor(group.groupId)) match {
       case Some(magicValue) =>
         val groupMetadataValueVersion = {
@@ -224,7 +224,7 @@ class GroupMetadataManager(brokerId: Int,
 
           responseCallback(responseError)
         }
-        Some(DelayedStore(groupMetadataRecords, putCacheCallback))
+        appendForGroup(group, groupMetadataRecords, putCacheCallback)
 
       case None =>
         responseCallback(Errors.NOT_COORDINATOR)
@@ -232,27 +232,29 @@ class GroupMetadataManager(brokerId: Int,
     }
   }
 
-  def store(delayedStore: DelayedStore) {
+  private def appendForGroup(group: GroupMetadata,
+                             records: Map[TopicPartition, MemoryRecords],
+                             callback: Map[TopicPartition, PartitionResponse] => Unit):
Unit = {
     // call replica manager to append the group message
     replicaManager.appendRecords(
       timeout = config.offsetCommitTimeoutMs.toLong,
       requiredAcks = config.offsetCommitRequiredAcks,
       internalTopicsAllowed = true,
       isFromClient = false,
-      entriesPerPartition = delayedStore.partitionRecords,
-      responseCallback = delayedStore.callback)
+      entriesPerPartition = records,
+      responseCallback = callback,
+      delayedProduceLock = Some(group))
   }
 
   /**
    * Store offsets by appending it to the replicated log and then inserting to cache
    */
-  def prepareStoreOffsets(group: GroupMetadata,
-                          consumerId: String,
-                          generationId: Int,
-                          offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
-                          producerId: Long = RecordBatch.NO_PRODUCER_ID,
-                          producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH): Option[DelayedStore]
= {
+  def storeOffsets(group: GroupMetadata,
+                   consumerId: String,
+                   offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+                   responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
+                   producerId: Long = RecordBatch.NO_PRODUCER_ID,
+                   producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH): Unit = {
     // first filter out partitions with offset metadata size exceeding limit
     val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
@@ -331,7 +333,7 @@ class GroupMetadataManager(brokerId: Int,
                 }
 
                 debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId},
consumer $consumerId " +
-                  s"with generation $generationId failed when appending to log due to ${status.error.exceptionName}")
+                  s"with generation ${group.generationId} failed when appending to log due
to ${status.error.exceptionName}")
 
                 // transform the log append error code to the corresponding the commit status
error code
                 status.error match {
@@ -376,7 +378,7 @@ class GroupMetadataManager(brokerId: Int,
             }
           }
 
-          Some(DelayedStore(entries, putCacheCallback))
+          appendForGroup(group, entries, putCacheCallback)
 
         case None =>
           val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
@@ -1231,9 +1233,6 @@ object GroupMetadataManager {
 
 }
 
-case class DelayedStore(partitionRecords: Map[TopicPartition, MemoryRecords],
-                        callback: Map[TopicPartition, PartitionResponse] => Unit)
-
 case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
 
   def this(group: String, topic: String, partition: Int) =

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfc098e/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index b15077c..e8f8e4d 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -494,7 +494,7 @@ class TransactionStateManager(brokerId: Int,
                 isFromClient = false,
                 recordsPerPartition,
                 updateCacheCallback,
-                delayedProduceSyncObject = Some(newMetadata))
+                delayedProduceLock = Some(newMetadata))
 
               trace(s"Appended new metadata $newMetadata for transaction id $transactionalId
with coordinator epoch $coordinatorEpoch to the local transaction log")
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfc098e/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 5bc0b9b..0ff8d34 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -53,9 +53,12 @@ case class ProduceMetadata(produceRequiredAcks: Short,
 class DelayedProduce(delayMs: Long,
                      produceMetadata: ProduceMetadata,
                      replicaManager: ReplicaManager,
-                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit)
+                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
+                     lockOpt: Option[Object] = None)
   extends DelayedOperation(delayMs) {
 
+  val lock = lockOpt.getOrElse(this)
+
   // first update the acks pending variable according to the error code
   produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
     if (status.responseStatus.error == Errors.NONE) {
@@ -69,6 +72,11 @@ class DelayedProduce(delayMs: Long,
     trace("Initial partition status for %s is %s".format(topicPartition, status))
   }
 
+  override def safeTryComplete(): Boolean = lock synchronized {
+    tryComplete()
+  }
+
+
   /**
    * The delayed produce operation can be completed if every partition
    * it produces to is satisfied by one of the following:
@@ -82,7 +90,7 @@ class DelayedProduce(delayMs: Long,
   override def tryComplete(): Boolean = {
     // check for each partition if it still has pending acks
     produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
-      trace(s"Checking produce satisfaction for ${topicPartition}, current status $status")
+      trace(s"Checking produce satisfaction for $topicPartition, current status $status")
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
         val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
@@ -124,22 +132,6 @@ class DelayedProduce(delayMs: Long,
   }
 }
 
-/**
- * If the responseCallback of this delayed produce object is already synchronized with a
different object, then users can
- * apply this extended delayed produce object to avoid calling tryComplete() with synchronization
on the operation object to avoid dead-lock
- */
-class SafeDelayedProduce(delayMs: Long,
-                         syncObject: Object,
-                         produceMetadata: ProduceMetadata,
-                         replicaManager: ReplicaManager,
-                         responseCallback: Map[TopicPartition, PartitionResponse] => Unit)
-  extends DelayedProduce(delayMs, produceMetadata, replicaManager, responseCallback) {
-
-  override def safeTryComplete(): Boolean = syncObject synchronized {
-    tryComplete()
-  }
-}
-
 object DelayedProduceMetrics extends KafkaMetricsGroup {
 
   private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfc098e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index fedbafb..cc5bfb0 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -339,7 +339,7 @@ class ReplicaManager(val config: KafkaConfig,
                     isFromClient: Boolean,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
-                    delayedProduceSyncObject: Option[Object] = None) {
+                    delayedProduceLock: Option[Object] = None) {
 
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
@@ -357,13 +357,7 @@ class ReplicaManager(val config: KafkaConfig,
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults))
{
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
-        val delayedProduce = delayedProduceSyncObject match {
-          case Some(syncObject) =>
-            new SafeDelayedProduce(timeout, syncObject, produceMetadata, this, responseCallback)
-
-          case None =>
-            new DelayedProduce(timeout, produceMetadata, this, responseCallback)
-        }
+        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback,
delayedProduceLock)
 
         // create a list of (topic, partition) pairs to use as keys for this delayed produce
operation
         val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfc098e/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 358e12c..592e343 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1437,10 +1437,11 @@ class GroupCoordinatorTest extends JUnitSuite {
       EasyMock.anyObject().asInstanceOf[Option[Object]])
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
-          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
+          Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+            new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
+          )
         )
-      )})
+      })
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dfc098e/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index a8a8eae..4c2eb27 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -50,6 +50,7 @@ class GroupMetadataManagerTest {
 
   val groupId = "foo"
   val groupPartitionId = 0
+  val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
   val protocolType = "protocolType"
   val rebalanceTimeout = 60000
   val sessionTimeout = 10000
@@ -82,7 +83,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadOffsetsWithoutGroup() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
 
     val committedOffsets = Map(
@@ -110,7 +111,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadTransactionalOffsetsWithoutGroup() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -144,7 +145,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testDoNotLoadAbortedTransactionalOffsetCommits() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -174,7 +175,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testGroupLoadedWithPendingCommits(): Unit = {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -209,7 +210,7 @@ class GroupMetadataManagerTest {
   @Test
   def testLoadWithCommittedAndAbortedTransactionalOffsetCommits() {
     // A test which loads a log with a mix of committed and aborted transactional offset
committed messages.
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -254,7 +255,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadWithCommittedAndAbortedAndPendingTransactionalOffsetCommits() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -318,7 +319,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadTransactionalOffsetCommitsFromMultipleProducers(): Unit = {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val firstProducerId = 1000L
     val firstProducerEpoch: Short = 2
     val secondProducerId = 1001L
@@ -374,7 +375,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testGroupLoadWithConsumerAndTransactionalOffsetCommitsConsumerWins(): Unit = {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -417,7 +418,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testGroupLoadWithConsumerAndTransactionalOffsetCommitsTransactionWins(): Unit = {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -486,7 +487,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadOffsetsWithTombstones() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
 
     val tombstonePartition = new TopicPartition("foo", 1)
@@ -521,7 +522,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadOffsetsAndGroup() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -554,7 +555,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadGroupWithTombstone() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
 
     val memberId = "98098230493"
@@ -578,7 +579,7 @@ class GroupMetadataManagerTest {
     // 1. the group exists at some point in time, but is later removed (because all members
left)
     // 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit
some offsets
 
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
+    val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
 
     val committedOffsets = Map(
@@ -628,8 +629,7 @@ class GroupMetadataManagerTest {
       maybeError = Some(error)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback).get
-    groupMetadataManager.store(delayedStore)
+    groupMetadataManager.storeGroup(group, Map.empty, callback)
     assertEquals(Some(Errors.NONE), maybeError)
   }
 
@@ -660,8 +660,7 @@ class GroupMetadataManagerTest {
       maybeError = Some(error)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback).get
-    groupMetadataManager.store(delayedStore)
+    groupMetadataManager.storeGroup(group, Map.empty, callback)
     assertEquals(Some(expectedError), maybeError)
 
     EasyMock.verify(replicaManager)
@@ -691,9 +690,10 @@ class GroupMetadataManagerTest {
       maybeError = Some(error)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()),
callback).get
-    groupMetadataManager.store(delayedStore)
+    groupMetadataManager.storeGroup(group, Map(memberId -> Array[Byte]()), callback)
     assertEquals(Some(Errors.NONE), maybeError)
+
+    EasyMock.verify(replicaManager)
   }
 
   @Test
@@ -719,15 +719,15 @@ class GroupMetadataManagerTest {
       maybeError = Some(error)
     }
 
-    groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback)
+    groupMetadataManager.storeGroup(group, Map(memberId -> Array[Byte]()), callback)
     assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
+
     EasyMock.verify(replicaManager)
   }
 
   @Test
   def testCommitOffset() {
     val memberId = ""
-    val generationId = -1
     val topicPartition = new TopicPartition("foo", 0)
     val offset = 37
 
@@ -746,11 +746,9 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId,
offsets, callback).get
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
     assertTrue(group.hasOffsets)
 
-    groupMetadataManager.store(delayedStore)
-
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
     assertEquals(Some(Errors.NONE), maybeError)
@@ -763,13 +761,132 @@ class GroupMetadataManagerTest {
     val partitionResponse = maybePartitionResponse.get
     assertEquals(Errors.NONE, partitionResponse.error)
     assertEquals(offset, partitionResponse.offset)
+
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testTransactionalCommitOffsetCommitted() {
+    val memberId = ""
+    val topicPartition = new TopicPartition("foo", 0)
+    val offset = 37
+    val producerId = 232L
+    val producerEpoch = 0.toShort
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+
+    val capturedResponseCallback = appendAndCaptureCallback()
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]) {
+      commitErrors = Some(errors)
+    }
+
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback, producerId, producerEpoch)
+    assertTrue(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+    capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+      new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)))
+
+    assertTrue(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+
+    group.completePendingTxnOffsetCommit(producerId, isCommit = true)
+    assertTrue(group.hasOffsets)
+    assertFalse(group.allOffsets.isEmpty)
+    assertEquals(Some(OffsetAndMetadata(offset)), group.offset(topicPartition))
+
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testTransactionalCommitOffsetAppendFailure() {
+    val memberId = ""
+    val topicPartition = new TopicPartition("foo", 0)
+    val offset = 37
+    val producerId = 232L
+    val producerEpoch = 0.toShort
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+
+    val capturedResponseCallback = appendAndCaptureCallback()
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]) {
+      commitErrors = Some(errors)
+    }
+
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback, producerId, producerEpoch)
+    assertTrue(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+    capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+      new PartitionResponse(Errors.NOT_ENOUGH_REPLICAS, 0L, RecordBatch.NO_TIMESTAMP)))
+
+    assertFalse(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+
+    group.completePendingTxnOffsetCommit(producerId, isCommit = false)
+    assertFalse(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testTransactionalCommitOffsetAborted() {
+    val memberId = ""
+    val topicPartition = new TopicPartition("foo", 0)
+    val offset = 37
+    val producerId = 232L
+    val producerEpoch = 0.toShort
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+
+    val capturedResponseCallback = appendAndCaptureCallback()
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]) {
+      commitErrors = Some(errors)
+    }
+
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback, producerId, producerEpoch)
+    assertTrue(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+    capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+      new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)))
+
+    assertTrue(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+
+    group.completePendingTxnOffsetCommit(producerId, isCommit = false)
+    assertFalse(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+
+    EasyMock.verify(replicaManager)
   }
 
   @Test
   def testCommitOffsetWhenCoordinatorHasMoved() {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(None)
     val memberId = ""
-    val generationId = -1
     val topicPartition = new TopicPartition("foo", 0)
     val offset = 37
 
@@ -787,11 +904,12 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
 
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
     assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
+
     EasyMock.verify(replicaManager)
   }
 
@@ -811,7 +929,6 @@ class GroupMetadataManagerTest {
     EasyMock.reset(replicaManager)
 
     val memberId = ""
-    val generationId = -1
     val topicPartition = new TopicPartition("foo", 0)
     val offset = 37
 
@@ -822,7 +939,7 @@ class GroupMetadataManagerTest {
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
 
-    expectAppendMessage(appendError)
+    val capturedResponseCallback = appendAndCaptureCallback()
     EasyMock.replay(replicaManager)
 
     var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
@@ -830,10 +947,10 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId,
offsets, callback).get
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
     assertTrue(group.hasOffsets)
-
-    groupMetadataManager.store(delayedStore)
+    capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+      new PartitionResponse(appendError, 0L, RecordBatch.NO_TIMESTAMP)))
 
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
@@ -849,7 +966,6 @@ class GroupMetadataManagerTest {
   @Test
   def testExpireOffset() {
     val memberId = ""
-    val generationId = -1
     val topicPartition1 = new TopicPartition("foo", 0)
     val topicPartition2 = new TopicPartition("foo", 1)
     val offset = 37
@@ -865,7 +981,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId))).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -874,10 +990,9 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId,
offsets, callback).get
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
     assertTrue(group.hasOffsets)
 
-    groupMetadataManager.store(delayedStore)
     assertFalse(commitErrors.isEmpty)
     assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
 
@@ -899,6 +1014,8 @@ class GroupMetadataManagerTest {
     val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1,
topicPartition2)))
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
     assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset))
+
+    EasyMock.verify(replicaManager)
   }
 
   @Test
@@ -917,7 +1034,7 @@ class GroupMetadataManagerTest {
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId))).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
       isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -965,7 +1082,7 @@ class GroupMetadataManagerTest {
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId))).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
       isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -1004,7 +1121,6 @@ class GroupMetadataManagerTest {
     // this is a group which is only using kafka for offset storage
 
     val memberId = ""
-    val generationId = -1
     val topicPartition1 = new TopicPartition("foo", 0)
     val topicPartition2 = new TopicPartition("foo", 1)
     val offset = 37
@@ -1020,7 +1136,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId))).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -1029,10 +1145,9 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId,
offsets, callback).get
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
     assertTrue(group.hasOffsets)
 
-    groupMetadataManager.store(delayedStore)
     assertFalse(commitErrors.isEmpty)
     assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
 
@@ -1068,6 +1183,8 @@ class GroupMetadataManagerTest {
     val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1,
topicPartition2)))
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+
+    EasyMock.verify(replicaManager)
   }
 
   @Test
@@ -1097,7 +1214,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId))).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -1106,10 +1223,9 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, group.generationId,
offsets, callback).get
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
     assertTrue(group.hasOffsets)
 
-    groupMetadataManager.store(delayedStore)
     assertFalse(commitErrors.isEmpty)
     assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
 
@@ -1133,8 +1249,24 @@ class GroupMetadataManagerTest {
     val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1,
topicPartition2)))
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+
+    EasyMock.verify(replicaManager)
   }
 
+  private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse]
=> Unit] = {
+    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      internalTopicsAllowed = EasyMock.eq(true),
+      isFromClient = EasyMock.eq(false),
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[Object]])
+    )
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+    capturedArgument
+  }
+  
   private def expectAppendMessage(error: Errors) {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
@@ -1146,7 +1278,7 @@ class GroupMetadataManagerTest {
       EasyMock.anyObject().asInstanceOf[Option[Object]])
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+        Map(groupTopicPartition ->
           new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)
         )
       )})


Mime
View raw message