kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] kafka git commit: KAFKA-5283; Handle producer epoch/sequence overflow
Date Fri, 02 Jun 2017 06:41:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0c3e466eb -> 1c882ee5f


http://git-wip-us.apache.org/repos/asf/kafka/blob/1c882ee5/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 2094528..54246c4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -98,13 +98,13 @@ class TransactionStateManagerTest {
   def testAddGetPids() {
     transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String,
TransactionMetadata]())
 
-    assertEquals(Right(None), transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertEquals(Right(None), transactionManager.getTransactionState(transactionalId1))
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
-      transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata1)))
+      transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1))
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
-      transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+      transactionManager.getTransactionState(transactionalId1))
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
-      transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata2)))
+      transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata2))
   }
 
   @Test
@@ -160,11 +160,11 @@ class TransactionStateManagerTest {
     prepareTxnLog(topicPartition, startOffset, records)
 
     // this partition should not be part of the owned partitions
-    transactionManager.getAndMaybeAddTransactionState(transactionalId1).fold(
+    transactionManager.getTransactionState(transactionalId1).fold(
       err => assertEquals(Errors.NOT_COORDINATOR, err),
       _ => fail(transactionalId1 + "'s transaction state is already in the cache")
     )
-    transactionManager.getAndMaybeAddTransactionState(transactionalId2).fold(
+    transactionManager.getTransactionState(transactionalId2).fold(
       err => assertEquals(Errors.NOT_COORDINATOR, err),
       _ => fail(transactionalId2 + "'s transaction state is already in the cache")
     )
@@ -174,16 +174,16 @@ class TransactionStateManagerTest {
     // let the time advance to trigger the background thread loading
     scheduler.tick()
 
-    transactionManager.getAndMaybeAddTransactionState(transactionalId1).fold(
+    transactionManager.getTransactionState(transactionalId1).fold(
       err => fail(transactionalId1 + "'s transaction state access returns error " + err),
       entry => entry.getOrElse(fail(transactionalId1 + "'s transaction state was not loaded
into the cache"))
     )
 
-    val cachedPidMetadata1 = transactionManager.getAndMaybeAddTransactionState(transactionalId1).fold(
+    val cachedPidMetadata1 = transactionManager.getTransactionState(transactionalId1).fold(
       err => fail(transactionalId1 + "'s transaction state access returns error " + err),
       entry => entry.getOrElse(fail(transactionalId1 + "'s transaction state was not loaded
into the cache"))
     )
-    val cachedPidMetadata2 = transactionManager.getAndMaybeAddTransactionState(transactionalId2).fold(
+    val cachedPidMetadata2 = transactionManager.getTransactionState(transactionalId2).fold(
       err => fail(transactionalId2 + "'s transaction state access returns error " + err),
       entry => entry.getOrElse(fail(transactionalId2 + "'s transaction state was not loaded
into the cache"))
     )
@@ -197,11 +197,11 @@ class TransactionStateManagerTest {
     // let the time advance to trigger the background thread removing
     scheduler.tick()
 
-    transactionManager.getAndMaybeAddTransactionState(transactionalId1).fold(
+    transactionManager.getTransactionState(transactionalId1).fold(
       err => assertEquals(Errors.NOT_COORDINATOR, err),
       _ => fail(transactionalId1 + "'s transaction state is still in the cache")
     )
-    transactionManager.getAndMaybeAddTransactionState(transactionalId2).fold(
+    transactionManager.getTransactionState(transactionalId2).fold(
       err => assertEquals(Errors.NOT_COORDINATOR, err),
       _ => fail(transactionalId2 + "'s transaction state is still in the cache")
     )
@@ -212,7 +212,7 @@ class TransactionStateManagerTest {
     transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String,
TransactionMetadata]())
 
     // first insert the initial transaction metadata
-    transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata1))
+    transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1)
 
     prepareForTxnMessageAppend(Errors.NONE)
     expectedError = Errors.NONE
@@ -224,7 +224,7 @@ class TransactionStateManagerTest {
     // append the new metadata into log
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata,
assertCallback)
 
-    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     // append to log again with expected failures
@@ -236,22 +236,22 @@ class TransactionStateManagerTest {
 
     prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
-    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
-    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
-    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
-    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     // test NOT_COORDINATOR cases
@@ -259,7 +259,7 @@ class TransactionStateManagerTest {
 
     prepareForTxnMessageAppend(Errors.NOT_LEADER_FOR_PARTITION)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
-    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     // test Unknown cases
@@ -267,12 +267,12 @@ class TransactionStateManagerTest {
 
     prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
-    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata,
assertCallback)
-    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
   }
 
@@ -281,7 +281,7 @@ class TransactionStateManagerTest {
     transactionManager.addLoadedTransactionsToCache(partitionId, 0, new Pool[String, TransactionMetadata]())
 
     // first insert the initial transaction metadata
-    transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata1))
+    transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1)
 
     prepareForTxnMessageAppend(Errors.NONE)
     expectedError = Errors.NOT_COORDINATOR
@@ -300,7 +300,7 @@ class TransactionStateManagerTest {
   def testAppendTransactionToLogWhilePendingStateChanged() = {
     // first insert the initial transaction metadata
     transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String,
TransactionMetadata]())
-    transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata1))
+    transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1)
 
     prepareForTxnMessageAppend(Errors.NONE)
     expectedError = Errors.INVALID_PRODUCER_EPOCH
@@ -317,7 +317,7 @@ class TransactionStateManagerTest {
 
   @Test
   def shouldReturnNotCooridnatorErrorIfTransactionIdPartitionNotOwned(): Unit = {
-    transactionManager.getAndMaybeAddTransactionState(transactionalId1).fold(
+    transactionManager.getTransactionState(transactionalId1).fold(
       err => assertEquals(Errors.NOT_COORDINATOR, err),
       _ => fail(transactionalId1 + "'s transaction state is already in the cache")
     )
@@ -329,12 +329,12 @@ class TransactionStateManagerTest {
       transactionManager.addLoadedTransactionsToCache(partitionId, 0, new Pool[String, TransactionMetadata]())
     }
 
-    transactionManager.getAndMaybeAddTransactionState("ongoing", Some(transactionMetadata("ongoing",
producerId = 0, state = Ongoing)))
-    transactionManager.getAndMaybeAddTransactionState("not-expiring", Some(transactionMetadata("not-expiring",
producerId = 1, state = Ongoing, txnTimeout = 10000)))
-    transactionManager.getAndMaybeAddTransactionState("prepare-commit", Some(transactionMetadata("prepare-commit",
producerId = 2, state = PrepareCommit)))
-    transactionManager.getAndMaybeAddTransactionState("prepare-abort", Some(transactionMetadata("prepare-abort",
producerId = 3, state = PrepareAbort)))
-    transactionManager.getAndMaybeAddTransactionState("complete-commit", Some(transactionMetadata("complete-commit",
producerId = 4, state = CompleteCommit)))
-    transactionManager.getAndMaybeAddTransactionState("complete-abort", Some(transactionMetadata("complete-abort",
producerId = 5, state = CompleteAbort)))
+    transactionManager.putTransactionStateIfNotExists("ongoing", transactionMetadata("ongoing",
producerId = 0, state = Ongoing))
+    transactionManager.putTransactionStateIfNotExists("not-expiring", transactionMetadata("not-expiring",
producerId = 1, state = Ongoing, txnTimeout = 10000))
+    transactionManager.putTransactionStateIfNotExists("prepare-commit", transactionMetadata("prepare-commit",
producerId = 2, state = PrepareCommit))
+    transactionManager.putTransactionStateIfNotExists("prepare-abort", transactionMetadata("prepare-abort",
producerId = 3, state = PrepareAbort))
+    transactionManager.putTransactionStateIfNotExists("complete-commit", transactionMetadata("complete-commit",
producerId = 4, state = CompleteCommit))
+    transactionManager.putTransactionStateIfNotExists("complete-abort", transactionMetadata("complete-abort",
producerId = 5, state = CompleteAbort))
 
     time.sleep(2000)
     val expiring = transactionManager.timedOutTransactions()
@@ -401,7 +401,7 @@ class TransactionStateManagerTest {
   }
 
   private def verifyMetadataDoesExist(transactionalId: String) = {
-    transactionManager.getAndMaybeAddTransactionState(transactionalId, None) match {
+    transactionManager.getTransactionState(transactionalId) match {
       case Left(errors) => fail("shouldn't have been any errors")
       case Right(None) => fail("metadata should have been removed")
       case Right(Some(metadata)) => // ok
@@ -409,7 +409,7 @@ class TransactionStateManagerTest {
   }
 
   private def verifyMetadataDoesntExist(transactionalId: String) = {
-    transactionManager.getAndMaybeAddTransactionState(transactionalId, None) match {
+    transactionManager.getTransactionState(transactionalId) match {
       case Left(errors) => fail("shouldn't have been any errors")
       case Right(Some(metdata)) => fail("metadata should have been removed")
       case Right(None) => // ok
@@ -453,10 +453,10 @@ class TransactionStateManagerTest {
 
     txnMetadata1.txnLastUpdateTimestamp = time.milliseconds() - txnConfig.transactionalIdExpirationMs
     txnMetadata1.state = txnState
-    transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata1))
+    transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1)
 
     txnMetadata2.txnLastUpdateTimestamp = time.milliseconds()
-    transactionManager.getAndMaybeAddTransactionState(transactionalId2, Some(txnMetadata2))
+    transactionManager.putTransactionStateIfNotExists(transactionalId2, txnMetadata2)
 
     transactionManager.enableTransactionalIdExpiration()
     time.sleep(txnConfig.removeExpiredTransactionalIdsIntervalMs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c882ee5/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 7227671..ac1d623 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -79,6 +79,33 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   @Test
+  def testProducerSequenceWrapAround(): Unit = {
+    val epoch = 15.toShort
+    val sequence = Int.MaxValue
+    val offset = 735L
+    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+
+    append(stateManager, producerId, epoch, 0, offset + 500)
+
+    val maybeLastEntry = stateManager.lastEntry(producerId)
+    assertTrue(maybeLastEntry.isDefined)
+
+    val lastEntry = maybeLastEntry.get
+    assertEquals(epoch, lastEntry.producerEpoch)
+    assertEquals(0, lastEntry.firstSeq)
+    assertEquals(0, lastEntry.lastSeq)
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testProducerSequenceInvalidWrapAround(): Unit = {
+    val epoch = 15.toShort
+    val sequence = Int.MaxValue
+    val offset = 735L
+    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, 1, offset + 500)
+  }
+
+  @Test
   def testNoValidationOnFirstEntryWhenLoadingLog(): Unit = {
     val epoch = 5.toShort
     val sequence = 16


Mime
View raw message