kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5131; WriteTxnMarkers and complete commit/abort on partition immigration [Forced Update!]
Date Wed, 03 May 2017 00:18:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1cd01adcd -> 619fd7aeb (forced update)


KAFKA-5131; WriteTxnMarkers and complete commit/abort on partition immigration

Write txn markers and complete the commit/abort for transactions in PrepareXX
state during partition immigration.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Apurva Mehta <apurva@confluent.io>,
Ismael Juma <ismael@juma.me.uk>

Closes #2926 from dguy/kafka-5059


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/619fd7ae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/619fd7ae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/619fd7ae

Branch: refs/heads/trunk
Commit: 619fd7aeb62b2c1a68938a341ff81f728fc60090
Parents: b2fcf73
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed May 3 01:01:39 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed May 3 01:18:17 2017 +0100

----------------------------------------------------------------------
 .../transaction/TransactionCoordinator.scala    | 112 ++++++++++---------
 .../transaction/TransactionStateManager.scala   |  24 +++-
 .../TransactionMarkerChannelManagerTest.scala   |   2 +-
 .../TransactionStateManagerTest.scala           |  33 +++++-
 4 files changed, 110 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/619fd7ae/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 2111a8f..46c061e 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -18,7 +18,6 @@ package kafka.coordinator.transaction
 
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache, ReplicaManager}
 import kafka.utils.{Logging, Scheduler, ZkUtils}
@@ -28,8 +27,6 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.Time
-import kafka.utils.CoreUtils.inWriteLock
-
 
 object TransactionCoordinator {
 
@@ -253,7 +250,7 @@ class TransactionCoordinator(brokerId: Int,
   }
 
   def handleTxnImmigration(transactionStateTopicPartitionId: Int, coordinatorEpoch: Int)
{
-      txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch)
+      txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch,
writeTxnMarkers)
   }
 
   def handleTxnEmigration(transactionStateTopicPartitionId: Int) {
@@ -322,61 +319,66 @@ class TransactionCoordinator(brokerId: Int,
       if (errors == Errors.NONE)
         txnManager.coordinatorEpochFor(transactionalId) match {
           case Some(coordinatorEpoch) =>
-            def completionCallback(error: Errors): Unit = {
-              error match {
-                case Errors.NONE =>
-                  txnManager.getTransactionState(transactionalId) match {
-                    case Some(preparedCommitMetadata) =>
-                      val completedState = if (nextState == PrepareCommit) CompleteCommit
else CompleteAbort
-                      val committedMetadata = new TransactionMetadata(pid,
-                        epoch,
-                        preparedCommitMetadata.txnTimeoutMs,
-                        completedState,
-                        preparedCommitMetadata.topicPartitions,
-                        preparedCommitMetadata.transactionStartTime,
-                        time.milliseconds())
-                      preparedCommitMetadata.prepareTransitionTo(completedState)
-
-                      def writeCommittedTransactionCallback(error: Errors): Unit =
-                        error match {
-                          case Errors.NONE =>
-                            trace(s"completed txn for transactionalId: $transactionalId state
after commit: ${txnManager.getTransactionState(transactionalId)}")
-                            txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionalId),
pid)
-                          case Errors.NOT_COORDINATOR =>
-                            // this one should be completed by the new coordinator
-                            warn(s"no longer the coordinator for transactionalId: $transactionalId")
-                          case _ =>
-                            warn(s"error: $error caught for transactionalId: $transactionalId
when appending state: $completedState. retrying")
-                            // retry until success
-                            txnManager.appendTransactionToLog(transactionalId, committedMetadata,
writeCommittedTransactionCallback)
-                        }
-
-                      txnManager.appendTransactionToLog(transactionalId, committedMetadata,
writeCommittedTransactionCallback)
-                    case None =>
-                      // this one should be completed by the new coordinator
-                      warn(s"no longer the coordinator for transactionalId: $transactionalId")
-                  }
-                case Errors.NOT_COORDINATOR =>
-                  warn(s"no longer the coordinator for transactionalId: $transactionalId")
-                case _ =>
-                  warn(s"error: $error caught when writing transaction markers for transactionalId:
$transactionalId. retrying")
-                  txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId),
-                    newMetadata,
-                    coordinatorEpoch,
-                    completionCallback)
-              }
-            }
-
-            txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId),
newMetadata, coordinatorEpoch, completionCallback)
+            writeTxnMarkers(WriteTxnMarkerArgs(transactionalId, pid, epoch, nextState, newMetadata,
coordinatorEpoch))
           case None =>
             // this one should be completed by the new coordinator
             warn(s"no longer the coordinator for transactionalId: $transactionalId")
         }
     }
-
     txnManager.appendTransactionToLog(transactionalId, newMetadata, logAppendCallback)
   }
 
+  private def writeTxnMarkers(markerArgs: WriteTxnMarkerArgs): Unit = {
+    def completionCallback(error: Errors): Unit = {
+      error match {
+        case Errors.NONE =>
+          txnManager.getTransactionState(markerArgs.transactionalId) match {
+            case Some(preparedCommitMetadata) =>
+              val completedState = if (markerArgs.nextState == PrepareCommit) CompleteCommit
else CompleteAbort
+              val committedMetadata = new TransactionMetadata(markerArgs.pid,
+                markerArgs.epoch,
+                preparedCommitMetadata.txnTimeoutMs,
+                completedState,
+                preparedCommitMetadata.topicPartitions,
+                preparedCommitMetadata.transactionStartTime,
+                time.milliseconds())
+              preparedCommitMetadata.prepareTransitionTo(completedState)
+
+              def writeCommittedTransactionCallback(error: Errors): Unit = {
+                error match {
+                  case Errors.NONE =>
+                    txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(markerArgs.transactionalId),
+                      markerArgs.pid)
+                  case Errors.NOT_COORDINATOR =>
+                    // this one should be completed by the new coordinator
+                    warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}")
+                  case _ =>
+                    warn(s"error: $error caught for transactionalId: ${markerArgs.transactionalId}
when appending state: $completedState. Retrying.")
+                    // retry until success
+                    txnManager.appendTransactionToLog(markerArgs.transactionalId, committedMetadata,
writeCommittedTransactionCallback)
+                }
+              }
+              txnManager.appendTransactionToLog(markerArgs.transactionalId, committedMetadata,
writeCommittedTransactionCallback)
+            case None =>
+              // this one should be completed by the new coordinator
+              warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}")
+          }
+        case Errors.NOT_COORDINATOR =>
+          warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}")
+        case _ =>
+          warn(s"error: $error caught when writing transaction markers for transactionalId:
${markerArgs.transactionalId}. retrying")
+          txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(markerArgs.transactionalId),
+            markerArgs.newMetadata,
+            markerArgs.coordinatorEpoch,
+            completionCallback)
+      }
+    }
+    txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(markerArgs.transactionalId),
+      markerArgs.newMetadata,
+      markerArgs.coordinatorEpoch,
+      completionCallback)
+  }
+
   def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs
 
   def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)
@@ -408,4 +410,10 @@ class TransactionCoordinator(brokerId: Int,
   }
 }
 
-case class InitPidResult(pid: Long, epoch: Short, error: Errors)
\ No newline at end of file
+case class InitPidResult(pid: Long, epoch: Short, error: Errors)
+case class WriteTxnMarkerArgs(transactionalId: String,
+                              pid: Long,
+                              epoch: Short,
+                              nextState: TransactionState,
+                              newMetadata: TransactionMetadata,
+                              coordinatorEpoch: Int)

http://git-wip-us.apache.org/repos/asf/kafka/blob/619fd7ae/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 2e40a34..c8931c4 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.locks.ReentrantLock
 
 import kafka.common.{KafkaException, Topic}
 import kafka.log.LogConfig
@@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 
-import scala.collection.{concurrent, mutable}
+import scala.collection.mutable
 import scala.collection.JavaConverters._
 
 
@@ -45,7 +45,7 @@ object TransactionManager {
 }
 
 /**
- * Transaction manager is part of the transaction coordinator, it manages:
+ * Transaction state manager is part of the transaction coordinator, it manages:
  *
  * 1. the transaction log, which is a special internal topic.
  * 2. the transaction metadata including its ongoing transaction status.
@@ -60,6 +60,8 @@ class TransactionStateManager(brokerId: Int,
 
   this.logIdent = "[Transaction Log Manager " + brokerId + "]: "
 
+  type WriteTxnMarkers = WriteTxnMarkerArgs => Unit
+
   /** shutting down flag */
   private val shuttingDown = new AtomicBoolean(false)
 
@@ -147,7 +149,7 @@ class TransactionStateManager(brokerId: Int,
     zkUtils.getTopicPartitionCount(Topic.TransactionStateTopicName).getOrElse(config.transactionLogNumPartitions)
   }
 
-  private def loadTransactionMetadata(topicPartition: TopicPartition) {
+  private def loadTransactionMetadata(topicPartition: TopicPartition, writeTxnMarkers: WriteTxnMarkers)
{
     def highWaterMark = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
 
     val startMs = time.milliseconds()
@@ -207,6 +209,16 @@ class TransactionStateManager(brokerId: Int,
 
                 throw new KafkaException("Loading transaction topic partition failed.")
               }
+              // if state is PrepareCommit or PrepareAbort we need to complete the transaction
+              if (currentTxnMetadata.state == PrepareCommit || currentTxnMetadata.state ==
PrepareAbort) {
+                writeTxnMarkers(WriteTxnMarkerArgs(transactionalId,
+                  txnMetadata.pid,
+                  txnMetadata.producerEpoch,
+                  txnMetadata.state,
+                  txnMetadata,
+                  coordinatorEpochFor(transactionalId).get
+                ))
+              }
           }
 
           removedTransactionalIds.foreach { transactionalId =>
@@ -229,7 +241,7 @@ class TransactionStateManager(brokerId: Int,
    * When this broker becomes a leader for a transaction log partition, load this partition
and
    * populate the transaction metadata cache with the transactional ids.
    */
-  def loadTransactionsForPartition(partition: Int, coordinatorEpoch: Int) {
+  def loadTransactionsForPartition(partition: Int, coordinatorEpoch: Int, writeTxnMarkers:
WriteTxnMarkers) {
     validateTransactionTopicPartitionCountIsStable()
 
     val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partition)
@@ -242,7 +254,7 @@ class TransactionStateManager(brokerId: Int,
     def loadTransactions() {
       info(s"Loading transaction metadata from $topicPartition")
       try {
-        loadTransactionMetadata(topicPartition)
+        loadTransactionMetadata(topicPartition, writeTxnMarkers)
       } catch {
         case t: Throwable => error(s"Error loading transactions from transaction log $topicPartition",
t)
       } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/619fd7ae/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 1c49151..29240a6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -17,7 +17,7 @@
 package kafka.coordinator.transaction
 
 import kafka.api.{LeaderAndIsr, PartitionStateInfo}
-import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, InterBrokerSendThread}
+import kafka.common.{BrokerEndPointNotAvailableException, InterBrokerSendThread}
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
 import kafka.utils.{MockTime, TestUtils}

http://git-wip-us.apache.org/repos/asf/kafka/blob/619fd7ae/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 2edcb8f..94dc12b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -156,7 +156,7 @@ class TransactionStateManagerTest {
     assertFalse(transactionManager.isCoordinatorFor(txnId1))
     assertFalse(transactionManager.isCoordinatorFor(txnId2))
 
-    transactionManager.loadTransactionsForPartition(partitionId, 0)
+    transactionManager.loadTransactionsForPartition(partitionId, 0, _ => ())
 
     // let the time advance to trigger the background thread loading
     scheduler.tick()
@@ -293,7 +293,7 @@ class TransactionStateManagerTest {
     val coordinatorEpoch = 10
     EasyMock.expect(replicaManager.getLog(EasyMock.anyObject(classOf[TopicPartition]))).andReturn(None)
     EasyMock.replay(replicaManager)
-    transactionManager.loadTransactionsForPartition(partitionId, coordinatorEpoch)
+    transactionManager.loadTransactionsForPartition(partitionId, coordinatorEpoch, _ =>
())
     val epoch = transactionManager.coordinatorEpochFor(txnId1).get
     assertEquals(coordinatorEpoch, epoch)
   }
@@ -303,6 +303,34 @@ class TransactionStateManagerTest {
     assertEquals(None, transactionManager.coordinatorEpochFor(txnId1))
   }
 
+  @Test
+  def shouldWriteTxnMarkersForTransactionInPreparedCommitState(): Unit = {
+    verifyWritesTxnMarkersInPrepareState(PrepareCommit)
+  }
+
+  @Test
+  def shouldWriteTxnMarkersForTransactionInPreparedAbortState(): Unit = {
+    verifyWritesTxnMarkersInPrepareState(PrepareAbort)
+  }
+
+  private def verifyWritesTxnMarkersInPrepareState(state: TransactionState): Unit = {
+    txnMetadata1.state = state
+    txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
+      new TopicPartition("topic1", 1)))
+
+    txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1))
+    val startOffset = 0L
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords:
_*)
+
+    prepareTxnLog(topicPartition, 0, records)
+
+    var receivedArgs: WriteTxnMarkerArgs = null
+    transactionManager.loadTransactionsForPartition(partitionId, 0, markerArgs => receivedArgs
= markerArgs)
+    scheduler.tick()
+
+    assertEquals(txnId1, receivedArgs.transactionalId)
+  }
+
   private def assertCallback(error: Errors): Unit = {
     assertEquals(expectedError, error)
   }
@@ -351,4 +379,5 @@ class TransactionStateManagerTest {
 
     EasyMock.replay(replicaManager)
   }
+
 }


Mime
View raw message