kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-5280: Protect txn metadata map with read-write lock
Date Tue, 23 May 2017 20:34:56 GMT
KAFKA-5280: Protect txn metadata map with read-write lock

Two major changes plus one minor change:

0. change stateLock to a read-write lock.

1. Put the check of "isCoordinator" and "coordinatorLoading" together with the return of the metadata, under one read lock block, since otherwise we can get incorrect behavior if there is a change in the metadata cache after the check but before the accessing of the metadata.

2. Grab the read lock right before trying to append to local txn log, and until the local append returns; this is to avoid the scenario that the epoch has actually changed when we are appending to local log (e.g. emigration followed by immigration).

3. only watch on txnId instead of txnId and txnPartitionId in the txn marker purgatory, and disable reaper thread, as we can now safely clear all the delayed operations by traversing the marker queues.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Jason Gustafson, Jun Rao

Closes #3082 from guozhangwang/K5231-read-write-lock

(cherry picked from commit 1bf64833168e81015e1f30a30c64eb6849c15422)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e6600a36
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e6600a36
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e6600a36

Branch: refs/heads/0.11.0
Commit: e6600a36816718c90f896ab89bda6650de60ad78
Parents: 49a2482
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue May 23 13:34:43 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue May 23 13:34:52 2017 -0700

----------------------------------------------------------------------
 .../group/GroupMetadataManager.scala            |   8 +-
 .../transaction/TransactionCoordinator.scala    | 133 ++++-----
 .../TransactionMarkerChannelManager.scala       |  38 ++-
 ...nsactionMarkerRequestCompletionHandler.scala |  41 ++-
 .../transaction/TransactionMetadata.scala       |   2 +-
 .../transaction/TransactionStateManager.scala   | 187 ++++++------
 .../scala/kafka/server/DelayedOperation.scala   |  14 +-
 .../scala/kafka/server/DelayedProduce.scala     |  16 +
 .../src/main/scala/kafka/server/KafkaApis.scala |  12 +-
 .../scala/kafka/server/ReplicaManager.scala     |  14 +-
 .../group/GroupCoordinatorResponseTest.scala    |  14 +-
 .../group/GroupMetadataManagerTest.scala        |   4 +-
 .../TransactionCoordinatorTest.scala            | 292 +++++++------------
 .../TransactionMarkerChannelManagerTest.scala   |   9 +-
 ...tionMarkerRequestCompletionHandlerTest.scala |  24 +-
 .../TransactionStateManagerTest.scala           | 105 ++++---
 16 files changed, 455 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index f8f536e..a7eb28b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -235,12 +235,12 @@ class GroupMetadataManager(brokerId: Int,
   def store(delayedStore: DelayedStore) {
     // call replica manager to append the group message
     replicaManager.appendRecords(
-      config.offsetCommitTimeoutMs.toLong,
-      config.offsetCommitRequiredAcks,
+      timeout = config.offsetCommitTimeoutMs.toLong,
+      requiredAcks = config.offsetCommitRequiredAcks,
       internalTopicsAllowed = true,
       isFromClient = false,
-      delayedStore.partitionRecords,
-      delayedStore.callback)
+      entriesPerPartition = delayedStore.partitionRecords,
+      responseCallback = delayedStore.callback)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index b58c710..b31c0bc 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -48,11 +48,12 @@ object TransactionCoordinator {
       config.transactionTransactionsExpiredTransactionCleanupIntervalMs)
 
     val producerIdManager = new ProducerIdManager(config.brokerId, zkUtils)
+    // we do not need to turn on reaper thread since no tasks will be expired and there are no completed tasks to be purged
+    val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId, reaperEnabled = false, timerEnabled = false)
     val txnStateManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
-    val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId, reaperEnabled = false)
     val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager, txnMarkerPurgatory, time)
 
-    new TransactionCoordinator(config.brokerId, scheduler, producerIdManager, txnStateManager, txnMarkerChannelManager, txnMarkerPurgatory, time)
+    new TransactionCoordinator(config.brokerId, scheduler, producerIdManager, txnStateManager, txnMarkerChannelManager, time)
   }
 
   private def initTransactionError(error: Errors): InitProducerIdResult = {
@@ -77,7 +78,6 @@ class TransactionCoordinator(brokerId: Int,
                              producerIdManager: ProducerIdManager,
                              txnManager: TransactionStateManager,
                              txnMarkerChannelManager: TransactionMarkerChannelManager,
-                             txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
                              time: Time) extends Logging {
   this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
 
@@ -100,35 +100,31 @@ class TransactionCoordinator(brokerId: Int,
       val producerId = producerIdManager.generateProducerId()
       responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
     } else if (transactionalId.isEmpty) {
-      //If transactional id is empty then return error as invalid request. This is
+      // if transactional id is empty then return error as invalid request. This is
       // to make TransactionCoordinator's behavior consistent with producer client
       responseCallback(initTransactionError(Errors.INVALID_REQUEST))
-    } else if (!txnManager.isCoordinatorFor(transactionalId)) {
-      // check if it is the assigned coordinator for the transactional id
-      responseCallback(initTransactionError(Errors.NOT_COORDINATOR))
-    } else if (txnManager.isCoordinatorLoadingInProgress(transactionalId)) {
-      responseCallback(initTransactionError(Errors.COORDINATOR_LOAD_IN_PROGRESS))
     } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) {
       // check transactionTimeoutMs is not larger than the broker configured maximum allowed value
       responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
     } else {
+      val producerId = producerIdManager.generateProducerId()
+      val now = time.milliseconds()
+      val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
+        producerId = producerId,
+        producerEpoch = 0,
+        txnTimeoutMs = transactionTimeoutMs,
+        state = Empty,
+        topicPartitions = collection.mutable.Set.empty[TopicPartition],
+        txnLastUpdateTimestamp = now)
+
       // only try to get a new producerId and update the cache if the transactional id is unknown
-      val result: Either[InitProducerIdResult, (Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId) match {
-        case None =>
-          val producerId = producerIdManager.generateProducerId()
-          val now = time.milliseconds()
-          val createdMetadata = new TransactionMetadata(
-            transactionalId = transactionalId,
-            producerId = producerId,
-            producerEpoch = 0,
-            txnTimeoutMs = transactionTimeoutMs,
-            state = Empty,
-            topicPartitions = collection.mutable.Set.empty[TopicPartition],
-            txnLastUpdateTimestamp = now)
-
-          val epochAndMetadata = txnManager.addTransaction(createdMetadata)
-          val coordinatorEpoch = epochAndMetadata.coordinatorEpoch
-          val txnMetadata = epochAndMetadata.transactionMetadata
+      val result: Either[InitProducerIdResult, (Int, TxnTransitMetadata)] = txnManager.getAndMaybeAddTransactionState(transactionalId, Some(createdMetadata)) match {
+        case Left(err) =>
+          Left(initTransactionError(err))
+
+        case Right(Some(existingEpochAndMetadata)) =>
+          val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch
+          val txnMetadata = existingEpochAndMetadata.transactionMetadata
 
           // there might be a concurrent thread that has just updated the mapping
           // with the transactional id at the same time (hence reference equality will fail);
@@ -141,13 +137,8 @@ class TransactionCoordinator(brokerId: Int,
             }
           }
 
-        case Some(existingEpochAndMetadata) =>
-          val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch
-          val txnMetadata = existingEpochAndMetadata.transactionMetadata
-
-          txnMetadata synchronized {
-            initProducerIdWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
-          }
+        case Right(None) =>
+          throw new IllegalStateException("Trying to add metadata to the cache still returns NONE; this is not expected")
       }
 
       result match {
@@ -209,33 +200,24 @@ class TransactionCoordinator(brokerId: Int,
     }
   }
 
-  private def validateTransactionalId(transactionalId: String): Errors =
-    if (transactionalId == null || transactionalId.isEmpty)
-      Errors.INVALID_REQUEST
-    else if (!txnManager.isCoordinatorFor(transactionalId))
-      Errors.NOT_COORDINATOR
-    else if (txnManager.isCoordinatorLoadingInProgress(transactionalId))
-      Errors.COORDINATOR_LOAD_IN_PROGRESS
-    else
-      Errors.NONE
-
-
   def handleAddPartitionsToTransaction(transactionalId: String,
                                        producerId: Long,
                                        producerEpoch: Short,
                                        partitions: collection.Set[TopicPartition],
                                        responseCallback: AddPartitionsCallback): Unit = {
-    val error = validateTransactionalId(transactionalId)
-    if (error != Errors.NONE) {
-      responseCallback(error)
+    if (transactionalId == null || transactionalId.isEmpty) {
+      responseCallback(Errors.INVALID_REQUEST)
     } else {
       // try to update the transaction metadata and append the updated metadata to txn log;
       // if there is no such metadata treat it as invalid producerId mapping error.
-      val result: Either[Errors, (Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId) match {
-        case None =>
+      val result: Either[Errors, (Int, TxnTransitMetadata)] = txnManager.getAndMaybeAddTransactionState(transactionalId) match {
+        case Left(err) =>
+          Left(err)
+
+        case Right(None) =>
           Left(Errors.INVALID_PRODUCER_ID_MAPPING)
 
-        case Some(epochAndMetadata) =>
+        case Right(Some(epochAndMetadata)) =>
           val coordinatorEpoch = epochAndMetadata.coordinatorEpoch
           val txnMetadata = epochAndMetadata.transactionMetadata
 
@@ -281,8 +263,8 @@ class TransactionCoordinator(brokerId: Int,
   private def logInvalidStateTransitionAndReturnError(transactionalId: String,
                                                       transactionState: TransactionState,
                                                       transactionResult: TransactionResult) = {
-    error(s"transactionalId: $transactionalId -- Current state is $transactionState, but received transaction " +
-      s"marker result: $transactionResult")
+    debug(s"TransactionalId: $transactionalId's state is $transactionState, but received transaction " +
+      s"marker result to send: $transactionResult")
     Left(Errors.INVALID_TXN_STATE)
   }
 
@@ -291,17 +273,20 @@ class TransactionCoordinator(brokerId: Int,
                            producerEpoch: Short,
                            txnMarkerResult: TransactionResult,
                            responseCallback: EndTxnCallback): Unit = {
-    val error = validateTransactionalId(transactionalId)
-    if (error != Errors.NONE)
-      responseCallback(error)
+    if (transactionalId == null || transactionalId.isEmpty)
+      responseCallback(Errors.INVALID_REQUEST)
     else {
-      val preAppendResult: Either[Errors, (Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId) match {
-        case None =>
+      val preAppendResult: Either[Errors, (Int, TxnTransitMetadata)] = txnManager.getAndMaybeAddTransactionState(transactionalId) match {
+        case Left(err) =>
+          Left(err)
+
+        case Right(None) =>
           Left(Errors.INVALID_PRODUCER_ID_MAPPING)
 
-        case Some(epochAndTxnMetadata) =>
+        case Right(Some(epochAndTxnMetadata)) =>
           val txnMetadata = epochAndTxnMetadata.transactionMetadata
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
+          val now = time.milliseconds()
 
           txnMetadata synchronized {
             if (txnMetadata.producerId != producerId)
@@ -350,10 +335,12 @@ class TransactionCoordinator(brokerId: Int,
         case Right((coordinatorEpoch, newMetadata)) =>
           def sendTxnMarkersCallback(error: Errors): Unit = {
             if (error == Errors.NONE) {
-              val preSendResult: Either[Errors, (TransactionMetadata, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId) match {
-                case Some(epochAndMetadata) =>
-                  if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
+              val preSendResult: Either[Errors, (TransactionMetadata, TxnTransitMetadata)] = txnManager.getAndMaybeAddTransactionState(transactionalId) match {
+                case Left(err) =>
+                  Left(err)
 
+                case Right(Some(epochAndMetadata)) =>
+                  if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
                     val txnMetadata = epochAndMetadata.transactionMetadata
                     txnMetadata synchronized {
                       if (txnMetadata.producerId != producerId)
@@ -384,16 +371,9 @@ class TransactionCoordinator(brokerId: Int,
                     Left(Errors.NOT_COORDINATOR)
                   }
 
-                case None =>
-                  if (txnManager.isCoordinatorFor(transactionalId)) {
-                    throw new IllegalStateException("Cannot find the metadata in coordinator's cache while it is still the leader of the txn topic partition")
-                  } else {
-                    // this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
-                    info(s"Updating $transactionalId's transaction state (txn topic partition ${partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
-                      s"failed after the transaction message has been appended to the log since the corresponding metadata does not exist in the cache anymore")
-
-                    Left(Errors.NOT_COORDINATOR)
-                  }
+                case Right(None) =>
+                  throw new IllegalStateException(s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
+                    s"no metadata in the cache; this is not expected")
               }
 
               preSendResult match {
@@ -430,9 +410,15 @@ class TransactionCoordinator(brokerId: Int,
         txnIdAndPidEpoch.producerId,
         txnIdAndPidEpoch.producerEpoch,
         TransactionResult.ABORT,
-        (error: Errors) => {
-          if (error != Errors.NONE)
-            warn(s"Rollback ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} aborted due to ${error.exceptionName()}")
+        (error: Errors) => error match {
+          case Errors.NONE =>
+            debug(s"Completed rollback ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} due to timeout")
+          case Errors.INVALID_PRODUCER_ID_MAPPING |
+               Errors.INVALID_PRODUCER_EPOCH |
+               Errors.CONCURRENT_TRANSACTIONS =>
+            debug(s"Rolling back ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} has aborted due to ${error.exceptionName()}")
+          case e =>
+            warn(s"Rolling back ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} failed due to ${error.exceptionName()}")
         })
     }
   }
@@ -464,7 +450,6 @@ class TransactionCoordinator(brokerId: Int,
     info("Shutting down.")
     isActive.set(false)
     scheduler.shutdown()
-    txnMarkerPurgatory.shutdown()
     producerIdManager.shutdown()
     txnManager.shutdown()
     txnMarkerChannelManager.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index b25e82d..461867d 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -113,7 +113,6 @@ class TxnMarkerQueue(@volatile private var destination: Node) {
 
   def node: Node = destination
 
-  // TODO: this function is only for metrics recording, not yet added
   def totalNumMarkers(): Int = markersPerTxnTopicPartition.map { case(_, queue) => queue.size()}.sum
 
   // visible for testing
@@ -140,11 +139,14 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
   def start(): Unit = {
     txnMarkerSendThread.start()
-    networkClient.wakeup()    // FIXME: is this really required?
   }
 
   def shutdown(): Unit = {
-    txnMarkerSendThread.shutdown()
+    txnMarkerSendThread.initiateShutdown()
+    // wake up the thread in case it is blocked inside poll
+    networkClient.wakeup()
+    txnMarkerSendThread.awaitShutdown()
+    txnMarkerPurgatory.shutdown()
     markersQueuePerBroker.clear()
   }
 
@@ -195,8 +197,15 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
         case Errors.NONE =>
           trace(s"Completed sending transaction markers for $transactionalId as $txnResult")
 
-          txnStateManager.getTransactionState(transactionalId) match {
-            case Some(epochAndMetadata) =>
+          txnStateManager.getAndMaybeAddTransactionState(transactionalId) match {
+            case Left(Errors.NOT_COORDINATOR) =>
+              info(s"I am no longer the coordinator for $transactionalId with coordinator epoch $coordinatorEpoch; cancel appending $newMetadata to transaction log")
+
+            case Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) =>
+              info(s"I am loading the transaction partition that contains $transactionalId while my current coordinator epoch is $coordinatorEpoch; " +
+                s"so appending $newMetadata to transaction log since the loading process will continue the left work")
+
+            case Right(Some(epochAndMetadata)) =>
               if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
                 debug(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId succeeded")
 
@@ -225,11 +234,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
                   s"has been sent to brokers. The cached metadata have been changed to $epochAndMetadata since preparing to send markers")
               }
 
-            case None =>
-              // this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
-              // we will stop appending the completed log entry to transaction topic as the new leader should be doing it.
-              info(s"Updating $transactionalId's transaction state (txn topic partition ${txnStateManager.partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
-                s"failed after the transaction message has been appended to the log since the corresponding metadata does not exist in the cache anymore")
+            case Right(None) =>
+              throw new IllegalStateException(s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
+                s"no metadata in the cache; this is not expected")
           }
 
         case other =>
@@ -237,14 +244,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
       }
     }
 
-    // watch for both the transactional id and the transaction topic partition id,
-    // so we can cancel all the delayed operations for the same partition id;
-    // NOTE this is only possible because the hashcode of Int / String never overlaps
-
-    // TODO: if the delayed txn marker will always have infinite timeout, we can replace it with a map
     val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, appendToLogCallback)
-    val txnTopicPartition = txnStateManager.partitionFor(transactionalId)
-    txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, Seq(transactionalId, txnTopicPartition))
+    txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, Seq(transactionalId))
 
     addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
   }
@@ -278,7 +279,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   }
 
   def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
-    txnMarkerPurgatory.cancelForKey(txnTopicPartitionId)
     markersQueuePerBroker.foreach { case(_, brokerQueue) =>
       brokerQueue.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue =>
         for (entry: TxnIdAndMarkerEntry <- queue.asScala)
@@ -293,8 +293,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     txnMarkerPurgatory.cancelForKey(transactionalId)
   }
 
-  // FIXME: Currently, operations registered under partition in txnMarkerPurgatory
-  // are only cleaned during coordinator immigration, which happens rarely. This means potential memory leak
   def completeSendMarkersForTxnId(transactionalId: String): Unit = {
     txnMarkerPurgatory.checkAndComplete(transactionalId)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index bcd95a1..7c1c356 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -42,14 +42,24 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
         val transactionalId = txnIdAndMarker.txnId
         val txnMarker = txnIdAndMarker.txnMarkerEntry
 
-        txnStateManager.getTransactionState(transactionalId) match {
-          case None =>
-            info(s"Transaction metadata for $transactionalId does not exist in the cache" +
-              s"any more; cancel sending transaction markers $txnMarker to the brokers")
+        txnStateManager.getAndMaybeAddTransactionState(transactionalId) match {
+
+          case Left(Errors.NOT_COORDINATOR) =>
+            info(s"I am no longer the coordinator for $transactionalId; cancel sending transaction markers $txnMarker to the brokers")
 
             txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
 
-          case Some(epochAndMetadata) =>
+          case Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) =>
+            info(s"I am loading the transaction partition that contains $transactionalId which means the current markers have to be obsoleted; " +
+              s"cancel sending transaction markers $txnMarker to the brokers")
+
+            txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
+
+          case Right(None) =>
+            throw new IllegalStateException(s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
+              s"no metadata in the cache; this is not expected")
+
+          case Right(Some(epochAndMetadata)) =>
             if (epochAndMetadata.coordinatorEpoch != txnMarker.coordinatorEpoch) {
               // coordinator epoch has changed, just cancel it from the purgatory
               info(s"Transaction coordinator epoch for $transactionalId has changed from ${txnMarker.coordinatorEpoch} to " +
@@ -57,7 +67,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
 
               txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
             } else {
-              // re-enqueue the markers
+              // re-enqueue the markers with possibly new destination brokers
               trace(s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for transactional id $transactionalId " +
                 s"under coordinator epoch ${txnMarker.coordinatorEpoch}")
 
@@ -83,14 +93,23 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
         if (errors == null)
           throw new IllegalStateException(s"WriteTxnMarkerResponse does not contain expected error map for producer id ${txnMarker.producerId}")
 
-        txnStateManager.getTransactionState(transactionalId) match {
-          case None =>
-            info(s"Transaction metadata for $transactionalId does not exist in the cache" +
-              s"any more; cancel sending transaction markers $txnMarker to the brokers")
+        txnStateManager.getAndMaybeAddTransactionState(transactionalId) match {
+          case Left(Errors.NOT_COORDINATOR) =>
+            info(s"I am no longer the coordinator for $transactionalId; cancel sending transaction markers $txnMarker to the brokers")
 
             txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
 
-          case Some(epochAndMetadata) =>
+          case Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) =>
+            info(s"I am loading the transaction partition that contains $transactionalId which means the current markers have to be obsoleted; " +
+              s"cancel sending transaction markers $txnMarker to the brokers")
+
+            txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
+
+          case Right(None) =>
+            throw new IllegalStateException(s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
+              s"no metadata in the cache; this is not expected")
+
+          case Right(Some(epochAndMetadata)) =>
             val txnMetadata = epochAndMetadata.transactionMetadata
             val retryPartitions: mutable.Set[TopicPartition] = mutable.Set.empty[TopicPartition]
             var abortSending: Boolean = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index fef395a..e1abf0e 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -140,7 +140,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
                                                var txnTimeoutMs: Int,
                                                var state: TransactionState,
                                                val topicPartitions: mutable.Set[TopicPartition],
-                                               var txnStartTimestamp: Long = -1,
+                                               @volatile var txnStartTimestamp: Long = -1,
                                                var txnLastUpdateTimestamp: Long) extends Logging {
 
   // pending state is used to indicate the state that this transaction is going to

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 7d1a571..b15077c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -20,13 +20,13 @@ import java.nio.ByteBuffer
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.common.KafkaException
 import kafka.log.LogConfig
 import kafka.message.UncompressedCodec
 import kafka.server.ReplicaManager
-import kafka.utils.CoreUtils.inLock
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.{Logging, Pool, Scheduler, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
@@ -70,10 +70,8 @@ class TransactionStateManager(brokerId: Int,
   /** shutting down flag */
   private val shuttingDown = new AtomicBoolean(false)
 
-  // TODO: we need to extend this lock as a read-write lock and reading access to it needs to be covered
-  // by the read lock
-  /** lock protecting access to loading and owned partition sets */
-  private val stateLock = new ReentrantLock()
+  /** lock protecting access to the transactional metadata cache, including loading and leaving partition sets */
+  private val stateLock = new ReentrantReadWriteLock()
 
   /** partitions of transaction topic that are being loaded, state lock should be called BEFORE accessing this set */
   private val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
@@ -87,13 +85,18 @@ class TransactionStateManager(brokerId: Int,
   /** number of partitions for the transaction log topic */
   private val transactionTopicPartitionCount = getTransactionTopicPartitionCount
 
-  // this is best-effort expiration and hence not grabing the lock on metadata upon checking its state
-  // we will get the lock when actually trying to transit the transaction metadata to abort later.
+  // this is best-effort expiration of an ongoing transaction which has been open for more than its
+  // txn timeout value, we do not need to grab the lock on the metadata object upon checking its state
+  // since the timestamp is volatile and we will get the lock when actually trying to transit the transaction
+  // metadata to abort later.
   def transactionsToExpire(): Iterable[TransactionalIdAndProducerIdEpoch] = {
     val now = time.milliseconds()
-    transactionMetadataCache.flatMap { case (_, entry) =>
-        entry.metadataPerTransactionalId.filter { case (txnId, txnMetadata) =>
-          if (isCoordinatorLoadingInProgress(txnId) || txnMetadata.pendingTransitionInProgress) {
+    inReadLock(stateLock) {
+      transactionMetadataCache.filter { case (txnPartitionId, _) =>
+        !leavingPartitions.exists(_.txnPartitionId == txnPartitionId)
+      }.flatMap { case (_, entry) =>
+        entry.metadataPerTransactionalId.filter { case (_, txnMetadata) =>
+          if (txnMetadata.pendingTransitionInProgress) {
             false
           } else {
             txnMetadata.state match {
@@ -105,6 +108,7 @@ class TransactionStateManager(brokerId: Int,
         }.map { case (txnId, txnMetadata) =>
           TransactionalIdAndProducerIdEpoch(txnId, txnMetadata.producerId, txnMetadata.producerEpoch)
         }
+      }
     }
   }
 
@@ -113,48 +117,46 @@ class TransactionStateManager(brokerId: Int,
   }
 
   /**
-   * Get the transaction metadata associated with the given transactional id, or null if not found
+   * Get the transaction metadata associated with the given transactional id, or an error if
+   * the coordinator does not own the transaction partition or is still loading it; if not found
+   * either return None or create a new metadata and added to the cache
+   *
+   * This function is covered by the state read lock
    */
-  def getTransactionState(transactionalId: String): Option[CoordinatorEpochAndTxnMetadata] = {
+  def getAndMaybeAddTransactionState(transactionalId: String,
+                                     createdTxnMetadata: Option[TransactionMetadata] = None): Either[Errors, Option[CoordinatorEpochAndTxnMetadata]]
+  = inReadLock(stateLock) {
     val partitionId = partitionFor(transactionalId)
 
-    // we only need to check leaving partition set but not loading partition set since there are three possible cases:
-    //    1) it is not in the loading partitions set, hence safe to return NONE
-    //    2) it is in the loading partitions with a smaller epoch, hence safe to return NONE
-    //    3) it is in the loading partition with a larger epoch, return NONE is also fine as it
-    //       indicates the metadata is not exist at least for now.
-    //
-    //    4) it is NOT possible to be in the loading partition with the same epoch
-    if (leavingPartitions.exists(_.txnPartitionId == partitionId))
-      return None
-
-    transactionMetadataCache.get(partitionId).flatMap { cacheEntry =>
-      cacheEntry.metadataPerTransactionalId.get(transactionalId) match {
-        case null => None
-        case txnMetadata => Some(CoordinatorEpochAndTxnMetadata(cacheEntry.coordinatorEpoch, txnMetadata))
-      }
-    }
-  }
+    if (loadingPartitions.exists(_.txnPartitionId == partitionId))
+      return Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
 
-  /**
-   * Add a new transaction metadata, or retrieve the metadata if it already exists with the associated transactional id
-   * along with the current coordinator epoch for that belonging transaction topic partition
-   */
-  def addTransaction(txnMetadata: TransactionMetadata): CoordinatorEpochAndTxnMetadata = {
-    val partitionId = partitionFor(txnMetadata.transactionalId)
+    if (leavingPartitions.exists(_.txnPartitionId == partitionId))
+      Right(Errors.NOT_COORDINATOR)
 
     transactionMetadataCache.get(partitionId) match {
-      case Some(txnMetadataCacheEntry) =>
-        val currentTxnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.putIfNotExists(
-          txnMetadata.transactionalId, txnMetadata)
-        if (currentTxnMetadata != null) {
-          CoordinatorEpochAndTxnMetadata(txnMetadataCacheEntry.coordinatorEpoch, currentTxnMetadata)
-        } else {
-          CoordinatorEpochAndTxnMetadata(txnMetadataCacheEntry.coordinatorEpoch, txnMetadata)
+      case Some(cacheEntry) =>
+        cacheEntry.metadataPerTransactionalId.get(transactionalId) match {
+          case null =>
+            createdTxnMetadata match {
+              case None =>
+                Right(None)
+
+              case Some(txnMetadata) =>
+                val currentTxnMetadata = cacheEntry.metadataPerTransactionalId.putIfNotExists(transactionalId, txnMetadata)
+                if (currentTxnMetadata != null) {
+                  Right(Some(CoordinatorEpochAndTxnMetadata(cacheEntry.coordinatorEpoch, currentTxnMetadata)))
+                } else {
+                  Right(Some(CoordinatorEpochAndTxnMetadata(cacheEntry.coordinatorEpoch, txnMetadata)))
+                }
+            }
+
+          case currentTxnMetadata =>
+            Right(Some(CoordinatorEpochAndTxnMetadata(cacheEntry.coordinatorEpoch, currentTxnMetadata)))
         }
 
       case None =>
-        throw new IllegalStateException(s"The metadata cache entry for txn partition $partitionId does not exist.")
+        Left(Errors.NOT_COORDINATOR)
     }
   }
 
@@ -179,28 +181,6 @@ class TransactionStateManager(brokerId: Int,
 
   def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount
 
-  def isCoordinatorFor(txnTopicPartitionId: Int): Boolean = inLock(stateLock) {
-    transactionMetadataCache.contains(txnTopicPartitionId)
-  }
-
-  def isCoordinatorFor(transactionalId: String): Boolean = inLock(stateLock) {
-    val partitionId = partitionFor(transactionalId)
-    transactionMetadataCache.contains(partitionId)
-  }
-
-  def isCoordinatorLoadingInProgress(transactionalId: String): Boolean = inLock(stateLock) {
-    val partitionId = partitionFor(transactionalId)
-
-    // we only need to check loading partition set but not leaving partition set since there are three possible cases:
-    //    1) it is not in the leaving partitions set, hence safe to return true
-    //    2) it is in the leaving partitions with a smaller epoch than the latest loading epoch, hence safe to return NONE
-    //    3) it is in the leaving partition with a larger epoch, return true is also OK since the client will then retry
-    //       later be notified that this coordinator is no longer be the transaction coordinator for him
-    //
-    //    4) it is NOT possible to be in the leaving partition with the same epoch
-    loadingPartitions.exists(_.txnPartitionId == partitionId)
-  }
-
   /**
    * Gets the partition count of the transaction log topic from ZooKeeper.
    * If the topic does not exist, the default partition count is returned.
@@ -228,7 +208,7 @@ class TransactionStateManager(brokerId: Int,
         try {
           while (currOffset < logEndOffset
             && !shuttingDown.get()
-            && inLock(stateLock) {loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch =>
+            && inReadLock(stateLock) {loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch =>
               idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
             val fetchDataInfo = log.read(currOffset, config.transactionLogLoadBufferSize, maxOffset = None,
               minOneMessage = true, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
@@ -267,9 +247,11 @@ class TransactionStateManager(brokerId: Int,
   }
 
   /**
-    * Add a transaction topic partition into the cache
-    */
-  def addLoadedTransactionsToCache(txnTopicPartition: Int, coordinatorEpoch: Int, metadataPerTransactionalId: Pool[String, TransactionMetadata]): Unit = {
+   * Add a transaction topic partition into the cache
+   *
+   * Make it package-private to be used only for unit tests.
+   */
+  private[transaction] def addLoadedTransactionsToCache(txnTopicPartition: Int, coordinatorEpoch: Int, metadataPerTransactionalId: Pool[String, TransactionMetadata]): Unit = {
     val txnMetadataCacheEntry = TxnMetadataCacheEntry(coordinatorEpoch, metadataPerTransactionalId)
     val currentTxnMetadataCacheEntry = transactionMetadataCache.put(txnTopicPartition, txnMetadataCacheEntry)
 
@@ -294,7 +276,7 @@ class TransactionStateManager(brokerId: Int,
     val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
     val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
 
-    inLock(stateLock) {
+    inWriteLock(stateLock) {
       leavingPartitions.remove(partitionAndLeaderEpoch)
       loadingPartitions.add(partitionAndLeaderEpoch)
     }
@@ -303,7 +285,7 @@ class TransactionStateManager(brokerId: Int,
       info(s"Loading transaction metadata from $topicPartition")
       val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch)
 
-      inLock(stateLock) {
+      inWriteLock(stateLock) {
         if (loadingPartitions.contains(partitionAndLeaderEpoch)) {
           addLoadedTransactionsToCache(topicPartition.partition, coordinatorEpoch, loadedTransactions)
 
@@ -345,13 +327,13 @@ class TransactionStateManager(brokerId: Int,
     val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
     val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
 
-    inLock(stateLock) {
+    inWriteLock(stateLock) {
       loadingPartitions.remove(partitionAndLeaderEpoch)
       leavingPartitions.add(partitionAndLeaderEpoch)
     }
 
     def removeTransactions() {
-      inLock(stateLock) {
+      inWriteLock(stateLock) {
         if (leavingPartitions.contains(partitionAndLeaderEpoch)) {
           transactionMetadataCache.remove(partitionId) match {
             case Some(txnMetadataCacheEntry) =>
@@ -388,7 +370,6 @@ class TransactionStateManager(brokerId: Int,
     val timestamp = time.milliseconds()
 
     val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, new SimpleRecord(timestamp, keyBytes, valueBytes))
-
     val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId))
     val recordsPerPartition = Map(topicPartition -> records)
 
@@ -445,8 +426,12 @@ class TransactionStateManager(brokerId: Int,
       if (responseError == Errors.NONE) {
         // now try to update the cache: we need to update the status in-place instead of
         // overwriting the whole object to ensure synchronization
-        getTransactionState(transactionalId) match {
-          case Some(epochAndMetadata) =>
+        getAndMaybeAddTransactionState(transactionalId) match {
+
+          case Left(err) =>
+            responseCallback(err)
+
+          case Right(Some(epochAndMetadata)) =>
             val metadata = epochAndMetadata.transactionMetadata
 
             metadata synchronized {
@@ -464,7 +449,7 @@ class TransactionStateManager(brokerId: Int,
               }
             }
 
-          case None =>
+          case Right(None) =>
             // this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
             // return NOT_COORDINATOR to let the client re-discover the transaction coordinator
             info(s"Updating $transactionalId's transaction state (txn topic partition ${partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
@@ -477,13 +462,45 @@ class TransactionStateManager(brokerId: Int,
       responseCallback(responseError)
     }
 
-    replicaManager.appendRecords(
-      newMetadata.txnTimeoutMs.toLong,
-      TransactionLog.EnforcedRequiredAcks,
-      internalTopicsAllowed = true,
-      isFromClient = false,
-      recordsPerPartition,
-      updateCacheCallback)
+    inReadLock(stateLock) {
+      // we need to hold the read lock on the transaction metadata cache until appending to local log returns;
+      // this is to avoid the case where an emigration followed by an immigration could have completed after the check
+      // returns and before appendRecords() is called, since otherwise entries with a high coordinator epoch could have
+      // been appended to the log in between these two events, and therefore appendRecords() would append entries with
+      // an old coordinator epoch that can still be successfully replicated on followers and make the log in a bad state.
+      getAndMaybeAddTransactionState(transactionalId) match {
+        case Left(err) =>
+          responseCallback(err)
+
+        case Right(None) =>
+          // the coordinator metadata has been removed, reply to client immediately with NOT_COORDINATOR
+          responseCallback(Errors.NOT_COORDINATOR)
+
+        case Right(Some(epochAndMetadata)) =>
+          val metadata = epochAndMetadata.transactionMetadata
+
+          metadata synchronized {
+            if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
+              // the coordinator epoch has changed, reply to client immediately with with NOT_COORDINATOR
+              responseCallback(Errors.NOT_COORDINATOR)
+            } else {
+              // do not need to check the metadata object itself since no concurrent thread should be able to modify it
+              // under the same coordinator epoch, so directly append to txn log now
+
+              replicaManager.appendRecords(
+                newMetadata.txnTimeoutMs.toLong,
+                TransactionLog.EnforcedRequiredAcks,
+                internalTopicsAllowed = true,
+                isFromClient = false,
+                recordsPerPartition,
+                updateCacheCallback,
+                delayedProduceSyncObject = Some(newMetadata))
+
+              trace(s"Appended new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log")
+            }
+          }
+      }
+    }
   }
 
   def shutdown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index fe7dada..4ae1b13 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -119,9 +119,10 @@ object DelayedOperationPurgatory {
   def apply[T <: DelayedOperation](purgatoryName: String,
                                    brokerId: Int = 0,
                                    purgeInterval: Int = 1000,
-                                   reaperEnabled: Boolean = true): DelayedOperationPurgatory[T] = {
+                                   reaperEnabled: Boolean = true,
+                                   timerEnabled: Boolean = true): DelayedOperationPurgatory[T] = {
     val timer = new SystemTimer(purgatoryName)
-    new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled)
+    new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled, timerEnabled)
   }
 
 }
@@ -133,7 +134,8 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
                                                              timeoutTimer: Timer,
                                                              brokerId: Int = 0,
                                                              purgeInterval: Int = 1000,
-                                                             reaperEnabled: Boolean = true)
+                                                             reaperEnabled: Boolean = true,
+                                                             timerEnabled: Boolean = true)
         extends Logging with KafkaMetricsGroup {
 
   /* a list of operation watching keys */
@@ -217,7 +219,8 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
 
     // if it cannot be completed by now and hence is watched, add to the expire queue also
     if (!operation.isCompleted) {
-      timeoutTimer.add(operation)
+      if (timerEnabled)
+        timeoutTimer.add(operation)
       if (operation.isCompleted) {
         // cancel the timer task
         operation.cancel()
@@ -253,6 +256,9 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
    */
   def delayed: Int = timeoutTimer.size
 
+  /**
+    * Cancel watching on any delayed operations for the given key. Note the operation will not be completed
+    */
   def cancelForKey(key: Any): List[T] = {
     inWriteLock(removeWatchersLock) {
       val watchers = watchersForKey.remove(key)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index f27dff3..5bc0b9b 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -124,6 +124,22 @@ class DelayedProduce(delayMs: Long,
   }
 }
 
+/**
+ * If the responseCallback of this delayed produce object is already synchronized with a different object, then users can
+ * apply this extended delayed produce object to avoid calling tryComplete() with synchronization on the operation object to avoid dead-lock
+ */
+class SafeDelayedProduce(delayMs: Long,
+                         syncObject: Object,
+                         produceMetadata: ProduceMetadata,
+                         replicaManager: ReplicaManager,
+                         responseCallback: Map[TopicPartition, PartitionResponse] => Unit)
+  extends DelayedProduce(delayMs, produceMetadata, replicaManager, responseCallback) {
+
+  override def safeTryComplete(): Boolean = syncObject synchronized {
+    tryComplete()
+  }
+}
+
 object DelayedProduceMetrics extends KafkaMetricsGroup {
 
   private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 197298c..b0d354b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -448,12 +448,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         // call the replica manager to append messages to the replicas
         replicaManager.appendRecords(
-          produceRequest.timeout.toLong,
-          produceRequest.acks,
-          internalTopicsAllowed,
+          timeout = produceRequest.timeout.toLong,
+          requiredAcks = produceRequest.acks,
+          internalTopicsAllowed = internalTopicsAllowed,
           isFromClient = true,
-          authorizedRequestInfo,
-          sendResponseCallback)
+          entriesPerPartition = authorizedRequestInfo,
+          responseCallback = sendResponseCallback)
 
         // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
         // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log
@@ -1515,7 +1515,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         internalTopicsAllowed = true,
         isFromClient = false,
         entriesPerPartition = controlRecords,
-        sendResponseCallback(producerId, marker.transactionResult))
+        responseCallback = sendResponseCallback(producerId, marker.transactionResult))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 47b6d69..7310a06 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -329,14 +329,16 @@ class ReplicaManager(val config: KafkaConfig,
 
   /**
    * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
-   * the callback function will be triggered either when timeout or the required acks are satisfied
+   * the callback function will be triggered either when timeout or the required acks are satisfied;
+   * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
    */
   def appendRecords(timeout: Long,
                     requiredAcks: Short,
                     internalTopicsAllowed: Boolean,
                     isFromClient: Boolean,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
-                    responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
+                    responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
+                    delayedProduceSyncObject: Option[Object] = None) {
 
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
@@ -354,7 +356,13 @@ class ReplicaManager(val config: KafkaConfig,
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
-        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
+        val delayedProduce = delayedProduceSyncObject match {
+          case Some(syncObject) =>
+            new SafeDelayedProduce(timeout, syncObject, produceMetadata, this, responseCallback)
+
+          case None =>
+            new DelayedProduce(timeout, produceMetadata, this, responseCallback)
+        }
 
         // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
         val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
index efa0a3b..0a8209e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
@@ -17,14 +17,13 @@
 
 package kafka.coordinator.group
 
-
 import kafka.common.OffsetAndMetadata
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
 import kafka.utils._
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, TimestampType}
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult}
 import org.easymock.{Capture, EasyMock, IAnswer}
@@ -1351,7 +1350,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[Object]])).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
@@ -1433,7 +1433,9 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[Object]])
+    ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
@@ -1459,7 +1461,9 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[Object]])
+    ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6600a36/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 8318741..a8a8eae 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1142,7 +1142,9 @@ class GroupMetadataManagerTest {
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[Object]])
+    ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
           new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)


Mime
View raw message