kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-8803: Remove timestamp check in completeTransitionTo (#8278)
Date Tue, 17 Mar 2020 21:41:10 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 9d68b8e  KAFKA-8803: Remove timestamp check in completeTransitionTo (#8278)
9d68b8e is described below

commit 9d68b8e3db2df135c799fc9523c99570d1ed6a26
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Tue Mar 17 14:40:02 2020 -0700

    KAFKA-8803: Remove timestamp check in completeTransitionTo (#8278)
    
    In prepareAddPartitions the txnStartTimestamp could be updated as updateTimestamp, which
is assumed to be always larger then the original startTimestamp. However, due to ntp time
shift the timer may go backwards and hence the newStartTimestamp be smaller than the original
one. Then later in completeTransitionTo the time check would fail with an IllegalStateException,
and the txn would not transit to Ongoing.
    
    An indirect result of this, is that this txn would NEVER be expired anymore because only
Ongoing ones would be checked for expiration.
    
    We should do the same as in #3286 to remove this check.
    
    Also added test coverage for both KAFKA-5415 and KAFKA-8803.
    
    Reviewers: Jason Gustafson<jason@confluent.io>
---
 .../transaction/TransactionMetadata.scala          |   3 +-
 .../transaction/TransactionMetadataTest.scala      | 215 ++++++++++++++++++---
 2 files changed, 194 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 4b57abf..24b418a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -378,8 +378,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
         case Ongoing => // from addPartitions
           if (!validProducerEpoch(transitMetadata) ||
             !topicPartitions.subsetOf(transitMetadata.topicPartitions) ||
-            txnTimeoutMs != transitMetadata.txnTimeoutMs ||
-            txnStartTimestamp > transitMetadata.txnStartTimestamp) {
+            txnTimeoutMs != transitMetadata.txnTimeoutMs) {
 
             throwStateTransitionFailure(transitMetadata)
           } else {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
index 506e68d..85ee263 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
@@ -17,6 +17,7 @@
 package kafka.coordinator.transaction
 
 import kafka.utils.MockTime
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.junit.Assert._
@@ -27,11 +28,11 @@ import scala.collection.mutable
 class TransactionMetadataTest {
 
   val time = new MockTime()
+  val producerId = 23423L
+  val transactionalId = "txnlId"
 
   @Test
   def testInitializeEpoch(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = RecordBatch.NO_PRODUCER_EPOCH
 
     val txnMetadata = new TransactionMetadata(
@@ -55,8 +56,6 @@ class TransactionMetadataTest {
 
   @Test
   def testNormalEpochBump(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = 735.toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -79,8 +78,6 @@ class TransactionMetadataTest {
 
   @Test(expected = classOf[IllegalStateException])
   def testBumpEpochNotAllowedIfEpochsExhausted(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = (Short.MaxValue - 1).toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -99,9 +96,197 @@ class TransactionMetadataTest {
   }
 
   @Test
+  def testTolerateUpdateTimeShiftDuringEpochBump(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = Empty,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    val transitMetadata = txnMetadata.prepareIncrementProducerEpoch(30000, Option(producerEpoch),
time.milliseconds() - 1).right.get
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(producerEpoch + 1, txnMetadata.producerEpoch)
+    assertEquals(producerEpoch, txnMetadata.lastProducerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateUpdateTimeResetDuringProducerIdRotation(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = Empty,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    val transitMetadata = txnMetadata.prepareProducerIdRotation(producerId + 1, 30000, time.milliseconds()
- 1, recordLastEpoch = true)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(producerId + 1, txnMetadata.producerId)
+    assertEquals(producerEpoch, txnMetadata.lastProducerEpoch)
+    assertEquals(0, txnMetadata.producerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateTimeShiftDuringAddPartitions(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = Empty,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = time.milliseconds(),
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller; when transting from Empty the start time would be updated
to the update-time
+    var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1",
0)), time.milliseconds() - 1)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0)), txnMetadata.topicPartitions)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+
+    // add another partition, check that in Ongoing state the start timestamp would not change
to update time
+    transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2",
0)), time.milliseconds() - 2)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0), new TopicPartition("topic2",
0)), txnMetadata.topicPartitions)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 2, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateTimeShiftDuringPrepareCommit(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = Ongoing,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    var transitMetadata = txnMetadata.prepareAbortOrCommit(PrepareCommit, time.milliseconds()
- 1)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(PrepareCommit, txnMetadata.state)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateTimeShiftDuringPrepareAbort(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = Ongoing,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    var transitMetadata = txnMetadata.prepareAbortOrCommit(PrepareAbort, time.milliseconds()
- 1)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(PrepareAbort, txnMetadata.state)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateTimeShiftDuringCompleteCommit(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = PrepareCommit,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    var transitMetadata = txnMetadata.prepareComplete(time.milliseconds() - 1)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(CompleteCommit, txnMetadata.state)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
+  def testTolerateTimeShiftDuringCompleteAbort(): Unit = {
+    val producerEpoch: Short = 1
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      lastProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = producerEpoch,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = 30000,
+      state = PrepareAbort,
+      topicPartitions = mutable.Set.empty,
+      txnStartTimestamp = 1L,
+      txnLastUpdateTimestamp = time.milliseconds())
+
+    // let new time be smaller
+    var transitMetadata = txnMetadata.prepareComplete(time.milliseconds() - 1)
+    txnMetadata.completeTransitionTo(transitMetadata)
+    assertEquals(CompleteAbort, txnMetadata.state)
+    assertEquals(producerId, txnMetadata.producerId)
+    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+    assertEquals(producerEpoch, txnMetadata.producerEpoch)
+    assertEquals(1L, txnMetadata.txnStartTimestamp)
+    assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+  }
+
+  @Test
   def testFenceProducerAfterEpochsExhausted(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = (Short.MaxValue - 1).toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -131,8 +316,6 @@ class TransactionMetadataTest {
 
   @Test(expected = classOf[IllegalStateException])
   def testFenceProducerNotAllowedIfItWouldOverflow(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = Short.MaxValue
 
     val txnMetadata = new TransactionMetadata(
@@ -151,8 +334,6 @@ class TransactionMetadataTest {
 
   @Test
   def testRotateProducerId(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = (Short.MaxValue - 1).toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -192,8 +373,6 @@ class TransactionMetadataTest {
 
   @Test
   def testAttemptedEpochBumpWithNewlyCreatedMetadata(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = 735.toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -217,8 +396,6 @@ class TransactionMetadataTest {
 
   @Test
   def testEpochBumpWithCurrentEpochProvided(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = 735.toShort
 
     val txnMetadata = new TransactionMetadata(
@@ -242,8 +419,6 @@ class TransactionMetadataTest {
 
   @Test
   def testAttemptedEpochBumpWithLastEpoch(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = 735.toShort
     val lastProducerEpoch = (producerEpoch - 1).toShort
 
@@ -268,8 +443,6 @@ class TransactionMetadataTest {
 
   @Test
   def testAttemptedEpochBumpWithFencedEpoch(): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = 735.toShort
     val lastProducerEpoch = (producerEpoch - 1).toShort
 
@@ -290,8 +463,6 @@ class TransactionMetadataTest {
   }
 
   private def testRotateProducerIdInOngoingState(state: TransactionState): Unit = {
-    val transactionalId = "txnlId"
-    val producerId = 23423L
     val producerEpoch = (Short.MaxValue - 1).toShort
 
     val txnMetadata = new TransactionMetadata(


Mime
View raw message