kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7298; Raise UnknownProducerIdException if next sequence number is unknown (#5518)
Date Mon, 20 Aug 2018 16:00:29 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8325046  KAFKA-7298; Raise UnknownProducerIdException if next sequence number is
unknown (#5518)
8325046 is described below

commit 8325046be2f6577c58fdff7366835308a81718f4
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Mon Aug 20 09:00:21 2018 -0700

    KAFKA-7298; Raise UnknownProducerIdException if next sequence number is unknown (#5518)
    
    If the only producer state left in the log is a transaction marker, then we do not know
the next expected sequence number. This can happen if there is a call to DeleteRecords which
arrives prior to the writing of the marker. Currently we raise an OutOfOrderSequence error
when this happens, but this is treated as a fatal error by the producer. Raising UnknownProducerId
instead allows the producer to check for truncation using the last acknowledged sequence number
and reset if possible.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../scala/kafka/log/ProducerStateManager.scala     | 10 +++++---
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 29 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 49c887b..2f71123 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -234,9 +234,13 @@ private[log] class ProducerAppendInfo(val producerId: Long,
         RecordBatch.NO_SEQUENCE
 
       if (currentLastSeq == RecordBatch.NO_SEQUENCE && appendFirstSeq != 0) {
-        // the epoch was bumped by a control record, so we expect the sequence number to
be reset
-        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId
$producerId: found $appendFirstSeq " +
-          s"(incoming seq. number), but expected 0")
+        // We have a matching epoch, but we do not know the next sequence number. This case
can happen if
+        // only a transaction marker is left in the log for this producer. We treat this
as an unknown
+        // producer id error, so that the producer can check the log start offset for truncation
and reset
+        // the sequence number. Note that this check follows the fencing check, so the marker
still fences
+        // old producers even if it cannot determine our next expected sequence number.
+        throw new UnknownProducerIdException(s"Local producer state matches expected epoch
$producerEpoch " +
+          s"for producerId=$producerId, but next expected sequence number is not known.")
       } else if (!inSequence(currentLastSeq, appendFirstSeq)) {
         throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId
$producerId: $appendFirstSeq " +
           s"(incoming seq. number), $currentLastSeq (current end sequence number)")
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 053aed7..f9f4a23 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -82,6 +82,35 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   @Test
+  def testAppendTxnMarkerWithNoProducerState(): Unit = {
+    val producerEpoch = 2.toShort
+    appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT,
offset = 27L)
+
+    val firstEntry = stateManager.lastEntry(producerId).getOrElse(fail("Expected last entry
to be defined"))
+    assertEquals(producerEpoch, firstEntry.producerEpoch)
+    assertEquals(producerId, firstEntry.producerId)
+    assertEquals(RecordBatch.NO_SEQUENCE, firstEntry.lastSeq)
+
+    // Fencing should continue to work even if the marker is the only thing left
+    assertThrows[ProducerFencedException] {
+      append(stateManager, producerId, 0.toShort, 0, 0L, 4L)
+    }
+
+    // If the transaction marker is the only thing left in the log, then an attempt to write
using a
+    // non-zero sequence number should cause an UnknownProducerId, so that the producer can
reset its state
+    assertThrows[UnknownProducerIdException] {
+      append(stateManager, producerId, producerEpoch, 17, 0L, 4L)
+    }
+
+    // The broker should accept the request if the sequence number is reset to 0
+    append(stateManager, producerId, producerEpoch, 0, 39L, 4L)
+    val secondEntry = stateManager.lastEntry(producerId).getOrElse(fail("Expected last entry
to be defined"))
+    assertEquals(producerEpoch, secondEntry.producerEpoch)
+    assertEquals(producerId, secondEntry.producerId)
+    assertEquals(0, secondEntry.lastSeq)
+  }
+
+  @Test
   def testProducerSequenceWrapAround(): Unit = {
     val epoch = 15.toShort
     val sequence = Int.MaxValue


Mime
View raw message