kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends (#8782)
Date Wed, 03 Jun 2020 17:54:52 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 74dcc00  KAFKA-10080; Fix race condition on txn completion which can cause duplicate
appends (#8782)
74dcc00 is described below

commit 74dcc00301e357224cb2a259bee941bf16b384c3
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Jun 3 10:37:53 2020 -0700

    KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends (#8782)
    
    The method `maybeWriteTxnCompletion` is unsafe for concurrent calls. This can cause duplicate
attempts to write the completion record to the log, which can ultimately lead to illegal state
errors and possible to correctness violations if another transaction had been started before
the duplicate was written. This patch fixes the problem by ensuring only one thread can successfully
remove the pending completion from the map.
    
    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
---
 .../TransactionMarkerChannelManager.scala          | 43 +++++------
 ...TransactionMarkerRequestCompletionHandler.scala |  2 +-
 .../TransactionMarkerChannelManagerTest.scala      | 87 +++++++++++++++++++---
 ...sactionMarkerRequestCompletionHandlerTest.scala |  2 +-
 4 files changed, 98 insertions(+), 36 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index dd8bb98..6fe2575 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -214,13 +214,11 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     }
   }
 
-  private def writeTxnCompletion(pendingCommitTxn: PendingCompleteTxn): Unit = {
-    transactionsWithPendingMarkers.remove(pendingCommitTxn.transactionalId)
-
-    val transactionalId = pendingCommitTxn.transactionalId
-    val txnMetadata = pendingCommitTxn.txnMetadata
-    val newMetadata = pendingCommitTxn.newMetadata
-    val coordinatorEpoch = pendingCommitTxn.coordinatorEpoch
+  private def writeTxnCompletion(pendingCompleteTxn: PendingCompleteTxn): Unit = {
+    val transactionalId = pendingCompleteTxn.transactionalId
+    val txnMetadata = pendingCompleteTxn.txnMetadata
+    val newMetadata = pendingCompleteTxn.newMetadata
+    val coordinatorEpoch = pendingCompleteTxn.coordinatorEpoch
 
     trace(s"Completed sending transaction markers for $transactionalId; begin transition
" +
       s"to ${newMetadata.txnState}")
@@ -242,7 +240,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
         if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
           debug(s"Sending $transactionalId's transaction markers for $txnMetadata with "
+
             s"coordinator epoch $coordinatorEpoch succeeded, trying to append complete transaction
log now")
-
           tryAppendToLog(PendingCompleteTxn(transactionalId, coordinatorEpoch, txnMetadata,
newMetadata))
         } else {
           info(s"The cached metadata $txnMetadata has changed to $epochAndMetadata after
" +
@@ -263,15 +260,13 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
                           txnMetadata: TransactionMetadata,
                           newMetadata: TxnTransitMetadata): Unit = {
     val transactionalId = txnMetadata.transactionalId
-
-    val pendingCommitTxn = PendingCompleteTxn(
+    val pendingCompleteTxn = PendingCompleteTxn(
       transactionalId,
       coordinatorEpoch,
       txnMetadata,
-      newMetadata
-    )
+      newMetadata)
 
-    transactionsWithPendingMarkers.put(transactionalId, pendingCommitTxn)
+    transactionsWithPendingMarkers.put(transactionalId, pendingCompleteTxn)
     addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId,
       txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
     maybeWriteTxnCompletion(transactionalId)
@@ -285,15 +280,16 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     }
   }
 
-  private def maybeWriteTxnCompletion(transactionalId: String): Unit = {
-    Option(transactionsWithPendingMarkers.get(transactionalId)).foreach { pendingCommitTxn
=>
-      if (!hasPendingMarkersToWrite(pendingCommitTxn.txnMetadata)) {
-        writeTxnCompletion(pendingCommitTxn)
+  def maybeWriteTxnCompletion(transactionalId: String): Unit = {
+    Option(transactionsWithPendingMarkers.get(transactionalId)).foreach { pendingCompleteTxn
=>
+      if (!hasPendingMarkersToWrite(pendingCompleteTxn.txnMetadata) &&
+          transactionsWithPendingMarkers.remove(transactionalId, pendingCompleteTxn)) {
+        writeTxnCompletion(pendingCompleteTxn)
       }
     }
   }
 
-  private def tryAppendToLog(txnLogAppend: PendingCompleteTxn) = {
+  private def tryAppendToLog(txnLogAppend: PendingCompleteTxn): Unit = {
     // try to append to the transaction log
     def appendCallback(error: Errors): Unit =
       error match {
@@ -404,18 +400,17 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   def removeMarkersForTxnId(transactionalId: String): Unit = {
     transactionsWithPendingMarkers.remove(transactionalId)
   }
-
-  def completeSendMarkersForTxnId(transactionalId: String): Unit = {
-    maybeWriteTxnCompletion(transactionalId)
-  }
 }
 
 case class TxnIdAndMarkerEntry(txnId: String, txnMarkerEntry: TxnMarkerEntry)
 
-case class PendingCompleteTxn(transactionalId: String, coordinatorEpoch: Int, txnMetadata:
TransactionMetadata, newMetadata: TxnTransitMetadata) {
+case class PendingCompleteTxn(transactionalId: String,
+                              coordinatorEpoch: Int,
+                              txnMetadata: TransactionMetadata,
+                              newMetadata: TxnTransitMetadata) {
 
   override def toString: String = {
-    "TxnLogAppend(" +
+    "PendingCompleteTxn(" +
       s"transactionalId=$transactionalId, " +
       s"coordinatorEpoch=$coordinatorEpoch, " +
       s"txnMetadata=$txnMetadata, " +
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 263e155..66edc47 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -193,7 +193,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
                   txnMarker.coordinatorEpoch,
                   retryPartitions.toSet)
               } else {
-                txnMarkerChannelManager.completeSendMarkersForTxnId(transactionalId)
+                txnMarkerChannelManager.maybeWriteTxnCompletion(transactionalId)
               }
             }
         }
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 46d9c9d..f01caa7 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -16,7 +16,10 @@
  */
 package kafka.coordinator.transaction
 
+import java.util
 import java.util.Arrays.asList
+import java.util.Collections
+import java.util.concurrent.{Callable, Executors, Future}
 
 import kafka.common.RequestAndCompletionHandler
 import kafka.metrics.KafkaYammerMetrics
@@ -34,11 +37,12 @@ import org.junit.Test
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
+import scala.util.Try
 
 class TransactionMarkerChannelManagerTest {
   private val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
   private val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient])
-  private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
+  private val txnStateManager: TransactionStateManager = EasyMock.mock(classOf[TransactionStateManager])
 
   private val partition1 = new TopicPartition("topic1", 0)
   private val partition2 = new TopicPartition("topic1", 1)
@@ -87,6 +91,70 @@ class TransactionMarkerChannelManagerTest {
   }
 
   @Test
+  def shouldOnlyWriteTxnCompletionOnce(): Unit = {
+    mockCache()
+
+    val expectedTransition = txnMetadata2.prepareComplete(time.milliseconds())
+
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition1.topic),
+      EasyMock.eq(partition1.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker1)).anyTimes()
+
+    EasyMock.expect(txnStateManager.appendTransactionToLog(
+      EasyMock.eq(transactionalId2),
+      EasyMock.eq(coordinatorEpoch),
+      EasyMock.eq(expectedTransition),
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()))
+      .andAnswer(() => {
+        txnMetadata2.completeTransitionTo(expectedTransition)
+        capturedErrorsCallback.getValue.apply(Errors.NONE)
+      }).once()
+
+    EasyMock.replay(txnStateManager, metadataCache)
+
+    var addMarkerFuture: Future[Try[Unit]] = null
+    val executor = Executors.newFixedThreadPool(1)
+    txnMetadata2.lock.lock()
+    try {
+      addMarkerFuture = executor.submit((() => {
+        Try(channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult,
+            txnMetadata2, expectedTransition))
+      }): Callable[Try[Unit]])
+
+      val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1)
+      val response = new WriteTxnMarkersResponse(
+        Collections.singletonMap(producerId2: java.lang.Long, Collections.singletonMap(partition1,
Errors.NONE)))
+      val clientResponse = new ClientResponse(header, null, null,
+        time.milliseconds(), time.milliseconds(), false, null, null,
+        response)
+
+      TestUtils.waitUntilTrue(() => {
+        val requests = channelManager.drainQueuedTransactionMarkers()
+        if (requests.nonEmpty) {
+          assertEquals(1, requests.size)
+          val request = requests.head
+          request.handler.onComplete(clientResponse)
+          true
+        } else {
+          false
+        }
+      }, "Timed out waiting for expected WriteTxnMarkers request")
+    } finally {
+      txnMetadata2.lock.unlock()
+      executor.shutdown()
+    }
+
+    assertNotNull(addMarkerFuture)
+    assertTrue("Add marker task failed with exception " + addMarkerFuture.get().get,
+      addMarkerFuture.get().isSuccess)
+
+    EasyMock.verify(txnStateManager)
+  }
+
+  @Test
   def shouldGenerateEmptyMapWhenNoRequestsOutstanding(): Unit = {
     assertTrue(channelManager.generateRequests().isEmpty)
   }
@@ -153,7 +221,6 @@ class TransactionMarkerChannelManagerTest {
     EasyMock.replay(metadataCache)
 
     channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds()))
-    channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds()))
 
     assertEquals(1, channelManager.numTxnsWithPendingMarkers)
     assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers)
@@ -291,7 +358,7 @@ class TransactionMarkerChannelManagerTest {
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
-      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE,
0, "client", 1),
+      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS,
0, "client", 1),
         null, null, 0, 0, false, null, null, response))
     }
 
@@ -338,7 +405,7 @@ class TransactionMarkerChannelManagerTest {
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
-      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE,
0, "client", 1),
+      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS,
0, "client", 1),
         null, null, 0, 0, false, null, null, response))
     }
 
@@ -387,7 +454,7 @@ class TransactionMarkerChannelManagerTest {
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
-      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE,
0, "client", 1),
+      requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS,
0, "client", 1),
         null, null, 0, 0, false, null, null, response))
     }
 
@@ -402,7 +469,7 @@ class TransactionMarkerChannelManagerTest {
     assertEquals(CompleteCommit, txnMetadata2.state)
   }
 
-  private def createPidErrorMap(errors: Errors) = {
+  private def createPidErrorMap(errors: Errors): util.HashMap[java.lang.Long, util.Map[TopicPartition,
Errors]] = {
     val pidMap = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
     val errorsMap = new java.util.HashMap[TopicPartition, Errors]()
     errorsMap.put(partition1, errors)
@@ -414,11 +481,11 @@ class TransactionMarkerChannelManagerTest {
   def shouldCreateMetricsOnStarting(): Unit = {
     val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
 
-    assertEquals(1, metrics.filter { case (k, _) =>
+    assertEquals(1, metrics.count { case (k, _) =>
       k.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=UnknownDestinationQueueSize"
-    }.size)
-    assertEquals(1, metrics.filter { case (k, _) =>
+    })
+    assertEquals(1, metrics.count { case (k, _) =>
       k.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=LogAppendRetryQueueSize"
-    }.size)
+    })
   }
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 15b7a9e..5ae961b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -233,7 +233,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
   private def verifyCompleteDelayedOperationOnError(error: Errors): Unit = {
 
     var completed = false
-    EasyMock.expect(markerChannelManager.completeSendMarkersForTxnId(transactionalId))
+    EasyMock.expect(markerChannelManager.maybeWriteTxnCompletion(transactionalId))
       .andAnswer(() => completed = true)
       .once()
     EasyMock.replay(markerChannelManager)


Mime
View raw message