kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5351: Reset pending state when returning an error in appendTransactionToLog
Date Thu, 01 Jun 2017 05:48:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e4a6b50de -> 049abe7ef


KAFKA-5351: Reset pending state when returning an error in appendTransactionToLog

Without this patch, future client retries would get the `CONCURRENT_TRANSACTIONS` error code
indefinitely, since the pending state wouldn't be cleared when the append to the log failed.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3184 from apurvam/KAFKA-5351-clear-pending-state-on-retriable-error


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/049abe7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/049abe7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/049abe7e

Branch: refs/heads/trunk
Commit: 049abe7efa17c9660fce7b57b4c235e24c72315c
Parents: e4a6b50
Author: Apurva Mehta <apurva@confluent.io>
Authored: Wed May 31 22:48:43 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed May 31 22:48:43 2017 -0700

----------------------------------------------------------------------
 .../transaction/TransactionCoordinator.scala    | 11 +++++++++
 .../transaction/TransactionMetadata.scala       |  7 ++++++
 .../transaction/TransactionStateManager.scala   | 25 +++++++++++++++++---
 .../TransactionStateManagerTest.scala           | 10 +++++++-
 4 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/049abe7e/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index f182420..44e32b1 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -198,6 +198,9 @@ class TransactionCoordinator(brokerId: Int,
         case Ongoing =>
           // indicate to abort the current ongoing txn first
           Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())
+        case Dead =>
+          throw new IllegalStateException(s"Found transactionalId $transactionalId with state
${txnMetadata.state}. " +
+            s"This is illegal as we should never have transitioned to this state.")
       }
     }
   }
@@ -326,6 +329,10 @@ class TransactionCoordinator(brokerId: Int,
                   logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state,
txnMarkerResult)
               case Empty =>
                 logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state,
txnMarkerResult)
+              case Dead =>
+                throw new IllegalStateException(s"Found transactionalId $transactionalId
with state ${txnMetadata.state}. " +
+                  s"This is illegal as we should never have transitioned to this state.")
+
             }
           }
       }
@@ -364,6 +371,10 @@ class TransactionCoordinator(brokerId: Int,
                             logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state,
txnMarkerResult)
                           else
                             Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
+                        case Dead =>
+                          throw new IllegalStateException(s"Found transactionalId $transactionalId
with state ${txnMetadata.state}. " +
+                            s"This is illegal as we should never have transitioned to this
state.")
+
                       }
                     }
                   } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/049abe7e/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 5956f1d..dbf0ec5 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -307,6 +307,13 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
             txnStartTimestamp = transitMetadata.txnStartTimestamp
             topicPartitions.clear()
           }
+        case Dead =>
+          // The transactionalId was being expired. The completion of the operation should
result in removal of the
+          // the metadata from the cache, so we should never realistically transition to
the dead state.
+          throw new IllegalStateException(s"TransactionalId : $transactionalId is trying
to complete a transition to " +
+            s"$toState. This means that the transactionalId was being expired, and the only
acceptable completion of " +
+            s"this operation is to remove the transaction metadata from the cache, not to
persist the $toState in the log.")
+
       }
 
       debug(s"TransactionalId $transactionalId complete transition from $state to $transitMetadata")

http://git-wip-us.apache.org/repos/asf/kafka/blob/049abe7e/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 19b9b91..05edefb 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -520,11 +520,9 @@ class TransactionStateManager(brokerId: Int,
                 // in this case directly return NOT_COORDINATOR to client and let it to re-discover
the transaction coordinator
                 info(s"Updating $transactionalId's transaction state to $newMetadata with
coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message
" +
                   s"has been appended to the log. The cached coordinator epoch has changed
to ${epochAndMetadata.coordinatorEpoch}")
-
                 responseError = Errors.NOT_COORDINATOR
               } else {
                 metadata.completeTransitionTo(newMetadata)
-
                 debug(s"Updating $transactionalId's transaction state to $newMetadata with
coordinator epoch $coordinatorEpoch for $transactionalId succeeded")
               }
             }
@@ -534,9 +532,30 @@ class TransactionStateManager(brokerId: Int,
             // return NOT_COORDINATOR to let the client re-discover the transaction coordinator
             info(s"Updating $transactionalId's transaction state (txn topic partition ${partitionFor(transactionalId)})
to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " +
               s"failed after the transaction message has been appended to the log since the
corresponding metadata does not exist in the cache anymore")
-
             responseError = Errors.NOT_COORDINATOR
         }
+      } else {
+        // Reset the pending state when returning an error, since there is no active transaction
for the transactional id at this point.
+        getAndMaybeAddTransactionState(transactionalId) match {
+          case Right(Some(epochAndTxnMetadata)) =>
+            val metadata = epochAndTxnMetadata.transactionMetadata
+            metadata synchronized {
+              if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) {
+                debug(s"TransactionalId ${metadata.transactionalId}, resetting pending state
since we are returning error $responseError")
+                metadata.pendingState = None
+              } else {
+                info(s"TransactionalId ${metadata.transactionalId} coordinator epoch changed
from " +
+                  s"${epochAndTxnMetadata.coordinatorEpoch} to $coordinatorEpoch after append
to log returned $responseError")
+              }
+            }
+          case Right(None) =>
+            // Do nothing here, since we want to return the original append error to the
user.
+            info(s"Found no metadata TransactionalId $transactionalId after append to log
returned error $responseError")
+          case Left(error) =>
+            // Do nothing here, since we want to return the original append error to the
user.
+            info(s"Retrieving metadata for transactionalId $transactionalId returned $error
after append to the log returned error $responseError")
+        }
+
       }
 
       responseCallback(responseError)

http://git-wip-us.apache.org/repos/asf/kafka/blob/049abe7e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 479f99b..2094528 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -225,6 +225,7 @@ class TransactionStateManagerTest {
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata,
assertCallback)
 
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     // append to log again with expected failures
     txnMetadata1.pendingState = None
@@ -236,18 +237,22 @@ class TransactionStateManagerTest {
     prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     // test NOT_COORDINATOR cases
     expectedError = Errors.NOT_COORDINATOR
@@ -255,17 +260,20 @@ class TransactionStateManagerTest {
     prepareForTxnMessageAppend(Errors.NOT_LEADER_FOR_PARTITION)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
-    // test NOT_COORDINATOR cases
+    // test Unknown cases
     expectedError = Errors.UNKNOWN
 
     prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
   }
 
   @Test


Mime
View raw message