From commits-return-13308-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Mon Dec 23 23:29:58 2019 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id 2A21D19981 for ; Mon, 23 Dec 2019 23:29:58 +0000 (UTC) Received: (qmail 82082 invoked by uid 500); 23 Dec 2019 23:29:57 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 82005 invoked by uid 500); 23 Dec 2019 23:29:57 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 81996 invoked by uid 99); 23 Dec 2019 23:29:57 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Dec 2019 23:29:57 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 325EB8D80D; Mon, 23 Dec 2019 23:29:57 +0000 (UTC) Date: Mon, 23 Dec 2019 23:29:54 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.4 updated: KAFKA-9307; Make transaction metadata loading resilient to previous errors (#7840) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157714379160.21995.5775676260001673110@gitbox.apache.org> From: jgus@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.4 X-Git-Reftype: branch X-Git-Oldrev: f93499e1caccee807df6694df42df295436335f3 X-Git-Newrev: 30f4fbf51a76cd232767802bd6368ea82f31f437 X-Git-Rev: 30f4fbf51a76cd232767802bd6368ea82f31f437 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new 30f4fbf KAFKA-9307; Make transaction metadata loading resilient to previous errors (#7840) 30f4fbf is described below commit 30f4fbf51a76cd232767802bd6368ea82f31f437 Author: Dhruvil Shah AuthorDate: Mon Dec 23 15:20:40 2019 -0800 KAFKA-9307; Make transaction metadata loading resilient to previous errors (#7840) Allow transaction metadata to be reloaded, even if it already exists as of a previous epoch. This helps with cases where a previous become-follower transition failed to unload corresponding metadata. Reviewers: Jun Rao , Jason Gustafson --- .../transaction/TransactionCoordinator.scala | 10 +++- .../transaction/TransactionStateManager.scala | 56 +++++++++++----------- .../transaction/TransactionStateManagerTest.scala | 28 ++++++++++- 3 files changed, 63 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 3f6ec34..d646757 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -275,12 +275,18 @@ class TransactionCoordinator(brokerId: Int, } def handleTxnImmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int): Unit = { + // The operations performed during immigration must be resilient to any previous errors we saw or partial state we + // left off during the unloading phase. Ensure we remove all associated state for this partition before we continue + // loading it. + txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId) + + // Now load the partition. txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend) } def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int): Unit = { - txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch) - txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId) + txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch) + txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId) } private def logInvalidStateTransitionAndReturnError(transactionalId: String, diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 01a24d6..9a9ed73 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -86,13 +86,13 @@ class TransactionStateManager(brokerId: Int, private val stateLock = new ReentrantReadWriteLock() /** partitions of transaction topic that are being loaded, state lock should be called BEFORE accessing this set */ - private val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set() + private[transaction] val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set() /** partitions of transaction topic that are being removed, state lock should be called BEFORE accessing this set */ - private val leavingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set() + private[transaction] val leavingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set() /** transaction metadata cache indexed by assigned transaction topic partition ids */ - private val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map() + private[transaction] val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map() /** number of partitions for the transaction log topic */ private val transactionTopicPartitionCount = getTransactionTopicPartitionCount @@ -367,31 +367,25 @@ class TransactionStateManager(brokerId: Int, /** * Add a transaction topic partition into the cache - * - * Make it package-private to be used only for unit tests. */ - private[transaction] def addLoadedTransactionsToCache(txnTopicPartition: Int, coordinatorEpoch: Int, metadataPerTransactionalId: Pool[String, TransactionMetadata]): Unit = { - val txnMetadataCacheEntry = TxnMetadataCacheEntry(coordinatorEpoch, metadataPerTransactionalId) - val currentTxnMetadataCacheEntry = transactionMetadataCache.put(txnTopicPartition, txnMetadataCacheEntry) - - if (currentTxnMetadataCacheEntry.isDefined) { - val coordinatorEpoch = currentTxnMetadataCacheEntry.get.coordinatorEpoch - val metadataPerTxnId = currentTxnMetadataCacheEntry.get.metadataPerTransactionalId - val errorMsg = s"The metadata cache for txn partition $txnTopicPartition has already exist with epoch $coordinatorEpoch " + - s"and ${metadataPerTxnId.size} entries while trying to add to it; " + - s"this should not happen" - fatal(errorMsg) - throw new IllegalStateException(errorMsg) + private[transaction] def addLoadedTransactionsToCache(txnTopicPartition: Int, + coordinatorEpoch: Int, + loadedTransactions: Pool[String, TransactionMetadata]): Unit = { + val txnMetadataCacheEntry = TxnMetadataCacheEntry(coordinatorEpoch, loadedTransactions) + val previousTxnMetadataCacheEntryOpt = transactionMetadataCache.put(txnTopicPartition, txnMetadataCacheEntry) + + previousTxnMetadataCacheEntryOpt.foreach { previousTxnMetadataCacheEntry => + warn(s"Unloaded transaction metadata $previousTxnMetadataCacheEntry from $txnTopicPartition as part of " + + s"loading metadata at epoch $coordinatorEpoch") } } /** - * When this broker becomes a leader for a transaction log partition, load this partition and - * populate the transaction metadata cache with the transactional ids. + * When this broker becomes a leader for a transaction log partition, load this partition and populate the transaction + * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from + * the previous loading / unloading operation. */ def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { - validateTransactionTopicPartitionCountIsStable() - val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch) @@ -401,7 +395,9 @@ class TransactionStateManager(brokerId: Int, } def loadTransactions(): Unit = { - info(s"Loading transaction metadata from $topicPartition") + info(s"Loading transaction metadata from $topicPartition at epoch $coordinatorEpoch") + validateTransactionTopicPartitionCountIsStable() + val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch) inWriteLock(stateLock) { @@ -436,6 +432,8 @@ class TransactionStateManager(brokerId: Int, } } } + + info(s"Completed loading transaction metadata from $topicPartition for coordinator epoch $coordinatorEpoch") } scheduler.schedule(s"load-txns-for-partition-$topicPartition", () => loadTransactions) @@ -446,8 +444,6 @@ class TransactionStateManager(brokerId: Int, * that belong to that partition. */ def removeTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int): Unit = { - validateTransactionTopicPartitionCountIsStable() - val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch) @@ -461,11 +457,10 @@ class TransactionStateManager(brokerId: Int, if (leavingPartitions.contains(partitionAndLeaderEpoch)) { transactionMetadataCache.remove(partitionId) match { case Some(txnMetadataCacheEntry) => - info(s"Removed ${txnMetadataCacheEntry.metadataPerTransactionalId.size} cached transaction metadata for $topicPartition on follower transition") + info(s"Unloaded transaction metadata $txnMetadataCacheEntry for $topicPartition on become-follower transition") case None => - info(s"Trying to remove cached transaction metadata for $topicPartition on follower transition but there is no entries remaining; " + - s"it is likely that another process for removing the cached entries has just executed earlier before") + info(s"No cached transaction metadata found for $topicPartition during become-follower transition") } leavingPartitions.remove(partitionAndLeaderEpoch) @@ -658,7 +653,12 @@ class TransactionStateManager(brokerId: Int, } -private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int, metadataPerTransactionalId: Pool[String, TransactionMetadata]) +private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int, + metadataPerTransactionalId: Pool[String, TransactionMetadata]) { + override def toString: String = { + s"TxnMetadataCacheEntry(coordinatorEpoch=$coordinatorEpoch, numTransactionalEntries=${metadataPerTransactionalId.size})" + } +} private[transaction] case class CoordinatorEpochAndTxnMetadata(coordinatorEpoch: Int, transactionMetadata: TransactionMetadata) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index d152888..038d4c5 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -467,7 +467,33 @@ class TransactionStateManagerTest { verifyMetadataDoesExistAndIsUsable(transactionalId2) } - private def verifyMetadataDoesExistAndIsUsable(transactionalId: String) = { + @Test + def testSuccessfulReimmigration(): Unit = { + txnMetadata1.state = PrepareCommit + txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0), + new TopicPartition("topic1", 1))) + + txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit())) + val startOffset = 0L + val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray: _*) + + prepareTxnLog(topicPartition, 0, records) + + // immigrate partition at epoch 0 + transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _, _) => ()) + assertEquals(0, transactionManager.loadingPartitions.size) + assertEquals(0, transactionManager.leavingPartitions.size) + + // Re-immigrate partition at epoch 1. This should be successful even though we didn't get to emigrate the partition. + prepareTxnLog(topicPartition, 0, records) + transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 1, (_, _, _, _, _) => ()) + assertEquals(0, transactionManager.loadingPartitions.size) + assertEquals(0, transactionManager.leavingPartitions.size) + assertTrue(transactionManager.transactionMetadataCache.get(partitionId).isDefined) + assertEquals(1, transactionManager.transactionMetadataCache.get(partitionId).get.coordinatorEpoch) + } + + private def verifyMetadataDoesExistAndIsUsable(transactionalId: String): Unit = { transactionManager.getTransactionState(transactionalId) match { case Left(errors) => fail("shouldn't have been any errors") case Right(None) => fail("metadata should have been removed")