kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5231: Bump up producer epoch when sending abort txn markers on InitPid
Date Thu, 18 May 2017 01:05:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b3a33ce4b -> c64cfd2e2


KAFKA-5231: Bump up producer epoch when sending abort txn markers on InitPid

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Jason Gustafson, Jun Rao

Closes #3066 from guozhangwang/K5231-bump-up-epoch-when-abort-txn


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c64cfd2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c64cfd2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c64cfd2e

Branch: refs/heads/trunk
Commit: c64cfd2e2bffd0a8fcfa515d3bbbe76416a89ee7
Parents: b3a33ce
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed May 17 18:05:12 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed May 17 18:05:12 2017 -0700

----------------------------------------------------------------------
 checkstyle/checkstyle.xml                       |   2 +-
 .../clients/producer/internals/Sender.java      |   8 +-
 .../transaction/DelayedTxnMarker.scala          |   1 +
 .../transaction/TransactionCoordinator.scala    |  10 +-
 .../TransactionMarkerChannelManager.scala       |   6 +-
 ...nsactionMarkerRequestCompletionHandler.scala |  59 +++++++---
 .../transaction/TransactionMetadata.scala       |  18 ++-
 .../transaction/TransactionStateManager.scala   | 113 +++++++++++++------
 .../scala/kafka/server/DelayedOperation.scala   |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   2 +-
 .../kafka/api/TransactionsTest.scala            |   2 +-
 .../TransactionCoordinatorTest.scala            |   6 +-
 .../TransactionStateManagerTest.scala           |   2 +-
 13 files changed, 157 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 6a263cc..ed846cd 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -111,7 +111,7 @@
 
     <module name="ClassFanOutComplexity">
       <!-- default is 20 -->
-      <property name="max" value="35"/>
+      <property name="max" value="40"/>
     </module>
     <module name="CyclomaticComplexity">
       <!-- default is 10-->

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7180171..8dea9c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -40,12 +40,14 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -440,9 +442,11 @@ public class Sender implements Runnable {
      * Handle a produce response
      */
     private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch>
batches, long now) {
-        int correlationId = response.requestHeader().correlationId();
+        RequestHeader requestHeader = response.requestHeader();
+        int correlationId = requestHeader.correlationId();
         if (response.wasDisconnected()) {
-            log.trace("Cancelled request {} due to node {} being disconnected", response,
response.destination());
+            ApiKeys api = ApiKeys.forId(requestHeader.apiKey());
+            log.trace("Cancelled {} request {} with correlation id {}  due to node {} being
disconnected", api, requestHeader, correlationId, response.destination());
             for (ProducerBatch batch : batches.values())
                 completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION),
correlationId, now);
         } else if (response.versionMismatch() != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
index 313087c..82c4a8c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -45,6 +45,7 @@ private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
     throw new IllegalStateException(s"Delayed write txn marker operation for metadata $txnMetadata
has timed out, this should never happen.")
   }
 
+  // TODO: if we will always return NONE upon completion, we can remove the error code in
the param
   override def onComplete(): Unit = completionCallback(Errors.NONE)
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 8148cb6..ebfbde5 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -197,7 +197,7 @@ class TransactionCoordinator(brokerId: Int,
 
         case Ongoing =>
           // indicate to abort the current ongoing txn first
-          Right(coordinatorEpoch, txnMetadata.prepareNoTransit())
+          Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())
       }
     }
   }
@@ -266,8 +266,8 @@ class TransactionCoordinator(brokerId: Int,
       txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch,
txnMarkerChannelManager.addTxnMarkersToSend)
   }
 
-  def handleTxnEmigration(txnTopicPartitionId: Int) {
-      txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId)
+  def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) {
+      txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch)
       txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
   }
 
@@ -382,8 +382,8 @@ class TransactionCoordinator(brokerId: Int,
                     throw new IllegalStateException("Cannot find the metadata in coordinator's
cache while it is still the leader of the txn topic partition")
                   } else {
                     // this transactional id no longer exists, maybe the corresponding partition
has already been migrated out.
-                    info(s"Updating $transactionalId's transaction state to $newMetadata
with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction
message " +
-                      s"has been appended to the log. The partition ${partitionFor(transactionalId)}
may have migrated as the metadata is no longer in the cache")
+                    info(s"Updating $transactionalId's transaction state (txn topic partition
${partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch
for $transactionalId " +
+                      s"failed after the transaction message has been appended to the log
since the corresponding metadata does not exist in the cache anymore")
 
                     Left(Errors.NOT_COORDINATOR)
                   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 90c9c42..9aa3e70 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -230,8 +230,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
             case None =>
               // this transactional id no longer exists, maybe the corresponding partition
has already been migrated out.
               // we will stop appending the completed log entry to transaction topic as the
new leader should be doing it.
-              info(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator
epoch $coordinatorEpoch for $transactionalId failed after the transaction message " +
-                s"has been appended to the log. The partition ${txnStateManager.partitionFor(transactionalId)}
may have migrated as the metadata is no longer in the cache")
+              info(s"Updating $transactionalId's transaction state (txn topic partition ${txnStateManager.partitionFor(transactionalId)})
to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
+                s"failed after the transaction message has been appended to the log since
the corresponding metadata does not exist in the cache anymore")
           }
 
         case other =>
@@ -295,6 +295,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     txnMarkerPurgatory.cancelForKey(transactionalId)
   }
 
+  // FIXME: Currently, operations registered under partition in txnMarkerPurgatory
+  // are only cleaned during coordinator immigration, which happens rarely. This means potential
memory leak
   def completeSendMarkersForTxnId(transactionalId: String): Unit = {
     txnMarkerPurgatory.checkAndComplete(transactionalId)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 39c7914..bcd95a1 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -17,11 +17,10 @@
 
 package kafka.coordinator.transaction
 
-import kafka.server.DelayedOperationPurgatory
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.WriteTxnMarkersResponse
 
 import scala.collection.mutable
@@ -32,18 +31,44 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
                                                 txnMarkerChannelManager: TransactionMarkerChannelManager,
                                                 txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry])
extends RequestCompletionHandler with Logging {
   override def onComplete(response: ClientResponse): Unit = {
-    val correlationId = response.requestHeader.correlationId
+    val requestHeader = response.requestHeader
+    val correlationId = requestHeader.correlationId
     if (response.wasDisconnected) {
-      trace(s"Cancelled request $response due to node ${response.destination} being disconnected")
-      // re-enqueue the markers
+      val api = ApiKeys.forId(requestHeader.apiKey)
+      val correlation = requestHeader.correlationId
+      trace(s"Cancelled $api request $requestHeader with correlation id $correlation due
to node ${response.destination} being disconnected")
+
       for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
+        val transactionalId = txnIdAndMarker.txnId
         val txnMarker = txnIdAndMarker.txnMarkerEntry
-        txnMarkerChannelManager.addTxnMarkersToBrokerQueue(txnIdAndMarker.txnId,
-          txnMarker.producerId(),
-          txnMarker.producerEpoch(),
-          txnMarker.transactionResult(),
-          txnMarker.coordinatorEpoch(),
-          txnMarker.partitions().toSet)
+
+        txnStateManager.getTransactionState(transactionalId) match {
+          case None =>
+            info(s"Transaction metadata for $transactionalId does not exist in the cache"
+
+              s"any more; cancel sending transaction markers $txnMarker to the brokers")
+
+            txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
+
+          case Some(epochAndMetadata) =>
+            if (epochAndMetadata.coordinatorEpoch != txnMarker.coordinatorEpoch) {
+              // coordinator epoch has changed, just cancel it from the purgatory
+              info(s"Transaction coordinator epoch for $transactionalId has changed from
${txnMarker.coordinatorEpoch} to " +
+                s"${epochAndMetadata.coordinatorEpoch}; cancel sending transaction markers
$txnMarker to the brokers")
+
+              txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
+            } else {
+              // re-enqueue the markers
+              trace(s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for
transactional id $transactionalId " +
+                s"under coordinator epoch ${txnMarker.coordinatorEpoch}")
+
+              txnMarkerChannelManager.addTxnMarkersToBrokerQueue(transactionalId,
+                txnMarker.producerId,
+                txnMarker.producerEpoch,
+                txnMarker.transactionResult,
+                txnMarker.coordinatorEpoch,
+                txnMarker.partitions.toSet)
+            }
+        }
       }
     } else {
       trace(s"Received response $response from node ${response.destination} with correlation
id $correlationId")
@@ -60,10 +85,9 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
 
         txnStateManager.getTransactionState(transactionalId) match {
           case None =>
-            info(s"Transaction topic partition for $transactionalId may likely has emigrated,
as the corresponding metadata do not exist in the cache" +
+            info(s"Transaction metadata for $transactionalId does not exist in the cache"
+
               s"any more; cancel sending transaction markers $txnMarker to the brokers")
 
-            // txn topic partition has likely emigrated, just cancel it from the purgatory
             txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
 
           case Some(epochAndMetadata) =>
@@ -121,13 +145,16 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
 
             if (!abortSending) {
               if (retryPartitions.nonEmpty) {
+                trace(s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for
transactional id $transactionalId " +
+                  s"under coordinator epoch ${txnMarker.coordinatorEpoch}")
+
                 // re-enqueue with possible new leaders of the partitions
                 txnMarkerChannelManager.addTxnMarkersToBrokerQueue(
                   transactionalId,
-                  txnMarker.producerId(),
-                  txnMarker.producerEpoch(),
+                  txnMarker.producerId,
+                  txnMarker.producerEpoch,
                   txnMarker.transactionResult,
-                  txnMarker.coordinatorEpoch(),
+                  txnMarker.coordinatorEpoch,
                   retryPartitions.toSet)
               } else {
                 txnMarkerChannelManager.completeSendMarkersForTxnId(transactionalId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 0d176aa..d739b9a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -135,16 +135,26 @@ private[transaction] class TransactionMetadata(val producerId: Long,
   }
 
   def removePartition(topicPartition: TopicPartition): Unit = {
-    if (pendingState.isDefined && (state != PrepareCommit && state != PrepareAbort))
-      throw new IllegalStateException(s"Transation metadata's current state is $state, and
its pending state is $state " +
+    if (state != PrepareCommit && state != PrepareAbort)
+      throw new IllegalStateException(s"Transaction metadata's current state is $state, and
its pending state is $pendingState " +
         s"while trying to remove partitions whose txn marker has been sent, this is not expected")
 
     topicPartitions -= topicPartition
   }
 
-  def prepareNoTransit(): TransactionMetadataTransition =
-    // do not call transitTo as it will set the pending state
+  // this is visible for test only
+  def prepareNoTransit(): TransactionMetadataTransition = {
+    // do not call transitTo as it will set the pending state, a follow-up call to abort
the transaction will set its pending state
     TransactionMetadataTransition(producerId, producerEpoch, txnTimeoutMs, state, topicPartitions.toSet,
txnStartTimestamp, txnLastUpdateTimestamp)
+  }
+
+  def prepareFenceProducerEpoch(): TransactionMetadataTransition = {
+    // bump up the epoch to let the txn markers be able to override the current producer
epoch
+    producerEpoch = (producerEpoch + 1).toShort
+
+    // do not call transitTo as it will set the pending state, a follow-up call to abort
the transaction will set its pending state
+    TransactionMetadataTransition(producerId, producerEpoch, txnTimeoutMs, state, topicPartitions.toSet,
txnStartTimestamp, txnLastUpdateTimestamp)
+  }
 
   def prepareIncrementProducerEpoch(newTxnTimeoutMs: Int,
                                     updateTimestamp: Long): TransactionMetadataTransition
= {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 2327213..0952b5d 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -70,11 +70,16 @@ class TransactionStateManager(brokerId: Int,
   /** shutting down flag */
   private val shuttingDown = new AtomicBoolean(false)
 
+  // TODO: we need to extend this lock as a read-write lock and reading access to it needs
to be covered
+  // by the read lock
   /** lock protecting access to loading and owned partition sets */
   private val stateLock = new ReentrantLock()
 
-  /** partitions of transaction topic that are being loaded, partition lock should be called
BEFORE accessing this set */
-  private val loadingPartitions: mutable.Set[Int] = mutable.Set()
+  /** partitions of transaction topic that are being loaded, state lock should be called
BEFORE accessing this set */
+  private val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
+
+  /** partitions of transaction topic that are being removed, state lock should be called
BEFORE accessing this set */
+  private val leavingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
 
   /** transaction metadata cache indexed by assigned transaction topic partition ids */
   private val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map()
@@ -113,6 +118,16 @@ class TransactionStateManager(brokerId: Int,
   def getTransactionState(transactionalId: String): Option[CoordinatorEpochAndTxnMetadata]
= {
     val partitionId = partitionFor(transactionalId)
 
+    // we only need to check leaving partition set but not loading partition set since there
are three possible cases:
+    //    1) it is not in the loading partitions set, hence safe to return NONE
+    //    2) it is in the loading partitions with a smaller epoch, hence safe to return NONE
+    //    3) it is in the loading partition with a larger epoch, return NONE is also fine
as it
+    //       indicates the metadata is not exist at least for now.
+    //
+    //    4) it is NOT possible to be in the loading partition with the same epoch
+    if (leavingPartitions.exists(_.txnPartitionId == partitionId))
+      return None
+
     transactionMetadataCache.get(partitionId).flatMap { cacheEntry =>
       cacheEntry.metadataPerTransactionalId.get(transactionalId) match {
         case null => None
@@ -174,7 +189,15 @@ class TransactionStateManager(brokerId: Int,
 
   def isCoordinatorLoadingInProgress(transactionalId: String): Boolean = inLock(stateLock)
{
     val partitionId = partitionFor(transactionalId)
-    loadingPartitions.contains(partitionId)
+
+    // we only need to check loading partition set but not leaving partition set since there
are three possible cases:
+    //    1) it is not in the leaving partitions set, hence safe to return true
+    //    2) it is in the leaving partitions with a smaller epoch than the latest loading
epoch, hence safe to return NONE
+    //    3) it is in the leaving partition with a larger epoch, return true is also OK since
the client will then retry
+    //       later be notified that this coordinator is no longer be the transaction coordinator
for him
+    //
+    //    4) it is NOT possible to be in the leaving partition with the same epoch
+    loadingPartitions.exists(_.txnPartitionId == partitionId)
   }
 
   /**
@@ -203,8 +226,9 @@ class TransactionStateManager(brokerId: Int,
 
         try {
           while (currOffset < logEndOffset
-            && loadingPartitions.contains(topicPartition.partition())
-            && !shuttingDown.get()) {
+            && !shuttingDown.get()
+            && inLock(stateLock) {loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch
=>
+              idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch
== coordinatorEpoch}}) {
             val fetchDataInfo = log.read(currOffset, config.transactionLogLoadBufferSize,
maxOffset = None,
               minOneMessage = true, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
             val memRecords = fetchDataInfo.records match {
@@ -275,38 +299,43 @@ class TransactionStateManager(brokerId: Int,
     validateTransactionTopicPartitionCountIsStable()
 
     val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+    val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
 
     inLock(stateLock) {
-      loadingPartitions.add(partitionId)
+      leavingPartitions.remove(partitionAndLeaderEpoch)
+      loadingPartitions.add(partitionAndLeaderEpoch)
     }
 
     def loadTransactions() {
       info(s"Loading transaction metadata from $topicPartition")
       val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch)
 
-      loadedTransactions.foreach {
-        case (transactionalId, txnMetadata) =>
-          val result = txnMetadata synchronized {
-            // if state is PrepareCommit or PrepareAbort we need to complete the transaction
-            txnMetadata.state match {
-              case PrepareAbort =>
-                Some(TransactionResult.ABORT, txnMetadata.prepareComplete(time.milliseconds()))
-              case PrepareCommit =>
-                Some(TransactionResult.COMMIT, txnMetadata.prepareComplete(time.milliseconds()))
-              case _ =>
-                // nothing need to be done
-                None
-            }
-          }
+      inLock(stateLock) {
+        if (loadingPartitions.contains(partitionAndLeaderEpoch)) {
+          addLoadedTransactionsToCache(topicPartition.partition, coordinatorEpoch, loadedTransactions)
+
+          loadedTransactions.foreach {
+            case (transactionalId, txnMetadata) =>
+              val result = txnMetadata synchronized {
+                // if state is PrepareCommit or PrepareAbort we need to complete the transaction
+                txnMetadata.state match {
+                  case PrepareAbort =>
+                    Some(TransactionResult.ABORT, txnMetadata.prepareComplete(time.milliseconds()))
+                  case PrepareCommit =>
+                    Some(TransactionResult.COMMIT, txnMetadata.prepareComplete(time.milliseconds()))
+                  case _ =>
+                    // nothing need to be done
+                    None
+                }
+              }
 
-          result.foreach { case (command, newMetadata) =>
-            sendTxnMarkers(transactionalId, coordinatorEpoch, command, txnMetadata, newMetadata)
+              result.foreach { case (command, newMetadata) =>
+                sendTxnMarkers(transactionalId, coordinatorEpoch, command, txnMetadata, newMetadata)
+              }
           }
-      }
 
-      inLock(stateLock) {
-        addLoadedTransactionsToCache(topicPartition.partition, coordinatorEpoch, loadedTransactions)
-        loadingPartitions.remove(partitionId)
+          loadingPartitions.remove(partitionAndLeaderEpoch)
+        }
       }
     }
 
@@ -317,23 +346,31 @@ class TransactionStateManager(brokerId: Int,
    * When this broker becomes a follower for a transaction log partition, clear out the cache
for corresponding transactional ids
    * that belong to that partition.
    */
-  def removeTransactionsForTxnTopicPartition(partitionId: Int) {
+  def removeTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int) {
     validateTransactionTopicPartitionCountIsStable()
 
     val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+    val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
+
+    inLock(stateLock) {
+      loadingPartitions.remove(partitionAndLeaderEpoch)
+      leavingPartitions.add(partitionAndLeaderEpoch)
+    }
 
     def removeTransactions() {
       inLock(stateLock) {
-        transactionMetadataCache.remove(partitionId) match {
-          case Some(txnMetadataCacheEntry) =>
-            info(s"Removed ${txnMetadataCacheEntry.metadataPerTransactionalId.size} cached
transaction metadata for $topicPartition on follower transition")
+        if (leavingPartitions.contains(partitionAndLeaderEpoch)) {
+          transactionMetadataCache.remove(partitionId) match {
+            case Some(txnMetadataCacheEntry) =>
+              info(s"Removed ${txnMetadataCacheEntry.metadataPerTransactionalId.size} cached
transaction metadata for $topicPartition on follower transition")
+
+            case None =>
+              info(s"Trying to remove cached transaction metadata for $topicPartition on
follower transition but there is no entries remaining; " +
+                s"it is likely that another process for removing the cached entries has just
executed earlier before")
+          }
 
-          case None =>
-            info(s"Trying to remove cached transaction metadata for $topicPartition on follower
transition but there is no entries remaining; " +
-              s"it is likely that another process for removing the cached entries has just
executed earlier before")
+          leavingPartitions.remove(partitionAndLeaderEpoch)
         }
-
-        loadingPartitions.remove(partitionId)
       }
     }
 
@@ -437,8 +474,8 @@ class TransactionStateManager(brokerId: Int,
           case None =>
             // this transactional id no longer exists, maybe the corresponding partition
has already been migrated out.
             // return NOT_COORDINATOR to let the client re-discover the transaction coordinator
-            info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator
epoch $coordinatorEpoch for $transactionalId failed after the transaction message " +
-              s"has been appended to the log. The partition ${partitionFor(transactionalId)}
may have migrated as the metadata is no longer in the cache")
+            info(s"Updating $transactionalId's transaction state (txn topic partition ${partitionFor(transactionalId)})
to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
+              s"failed after the transaction message has been appended to the log since the
corresponding metadata does not exist in the cache anymore")
 
             responseError = Errors.NOT_COORDINATOR
         }
@@ -480,3 +517,5 @@ private[transaction] case class TransactionConfig(transactionalIdExpirationMs:
I
                                                   removeExpiredTransactionsIntervalMs: Int
= TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs)
 
 case class TransactionalIdAndProducerIdEpoch(transactionalId: String, producerId: Long, producerEpoch:
Short)
+
+case class TransactionPartitionAndLeaderEpoch(txnPartitionId: Int, coordinatorEpoch: Int)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 6401600..fe7dada 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -121,7 +121,7 @@ object DelayedOperationPurgatory {
                                    purgeInterval: Int = 1000,
                                    reaperEnabled: Boolean = true): DelayedOperationPurgatory[T]
= {
     val timer = new SystemTimer(purgatoryName)
-    new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval)
+    new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 31680b0..33b696a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -150,7 +150,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           if (partition.topic == GROUP_METADATA_TOPIC_NAME)
             groupCoordinator.handleGroupEmigration(partition.partitionId)
           else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
-            txnCoordinator.handleTxnEmigration(partition.partitionId)
+            txnCoordinator.handleTxnEmigration(partition.partitionId, partition.getLeaderEpoch)
         }
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 3e19bb9..e8669e9 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -254,7 +254,7 @@ class TransactionsTest extends KafkaServerTestHarness {
     }
   }
 
-  @Ignore @Test
+  @Test
   def testFencingOnSend() {
     val transactionalId = "my-t.id"
     val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 2f4f572..7271edd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -499,7 +499,7 @@ class TransactionCoordinatorTest {
       .andReturn(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))
       .anyTimes()
 
-    val originalMetadata = new TransactionMetadata(pid, epoch, txnTimeoutMs, Ongoing, partitions,
0, 0)
+    val originalMetadata = new TransactionMetadata(pid, (epoch + 1).toShort, txnTimeoutMs,
Ongoing, partitions, 0, 0)
     EasyMock.expect(transactionManager.appendTransactionToLog(
       EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
@@ -521,11 +521,11 @@ class TransactionCoordinatorTest {
 
   @Test
   def shouldRemoveTransactionsForPartitionOnEmigration(): Unit = {
-    EasyMock.expect(transactionManager.removeTransactionsForTxnTopicPartition(0))
+    EasyMock.expect(transactionManager.removeTransactionsForTxnTopicPartition(0, coordinatorEpoch))
     EasyMock.expect(transactionMarkerChannelManager.removeMarkersForTxnTopicPartition(0))
     EasyMock.replay(transactionManager, transactionMarkerChannelManager)
 
-    coordinator.handleTxnEmigration(0)
+    coordinator.handleTxnEmigration(0, coordinatorEpoch)
 
     EasyMock.verify(transactionManager, transactionMarkerChannelManager)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 0d3263a..fb443ad 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -176,7 +176,7 @@ class TransactionStateManagerTest {
     assertTrue(transactionManager.isCoordinatorFor(txnId1))
     assertTrue(transactionManager.isCoordinatorFor(txnId2))
 
-    transactionManager.removeTransactionsForTxnTopicPartition(partitionId)
+    transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch)
 
     // let the time advance to trigger the background thread removing
     scheduler.tick()


Mime
View raw message