kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.0 updated: MINOR: Ensure producer state append exceptions areuseful (#6591)
Date Thu, 18 Apr 2019 16:31:39 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new eae7ab9  MINOR: Ensure producer state append exceptions areuseful (#6591)
eae7ab9 is described below

commit eae7ab995bb002510d1ce8b7b54fa0cfa82460d7
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Apr 18 08:38:43 2019 -0700

    MINOR: Ensure producer state append exceptions areuseful (#6591)
    
    We should include partition/offset information when we raise exceptions during producer
state validation. This saves a lot of the discovery work to figure out where the problem occurred.
This patch also includes a new test case to verify additional coordinator fencing cases.
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../scala/kafka/log/ProducerStateManager.scala     | 48 +++++++++++---------
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 52 ++++++++++++++++++----
 .../unit/kafka/log/ProducerStateManagerTest.scala  |  4 +-
 3 files changed, 73 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 62f4f45..c3e29a2 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -183,7 +183,8 @@ private[log] class ProducerStateEntry(val producerId: Long,
  *                       should have ValidationType.None. Appends coming from a client for
produce requests should have
  *                       ValidationType.Full.
  */
-private[log] class ProducerAppendInfo(val producerId: Long,
+private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
+                                      val producerId: Long,
                                       val currentEntry: ProducerStateEntry,
                                       val validationType: ValidationType) {
   private val transactions = ListBuffer.empty[TxnMetadata]
@@ -193,35 +194,36 @@ private[log] class ProducerAppendInfo(val producerId: Long,
   updatedEntry.coordinatorEpoch = currentEntry.coordinatorEpoch
   updatedEntry.currentTxnFirstOffset = currentEntry.currentTxnFirstOffset
 
-  private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int) = {
+  private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int, offset: Long): Unit
= {
     validationType match {
       case ValidationType.None =>
 
       case ValidationType.EpochOnly =>
-        checkProducerEpoch(producerEpoch)
+        checkProducerEpoch(producerEpoch, offset)
 
       case ValidationType.Full =>
-        checkProducerEpoch(producerEpoch)
-        checkSequence(producerEpoch, firstSeq)
+        checkProducerEpoch(producerEpoch, offset)
+        checkSequence(producerEpoch, firstSeq, offset)
     }
   }
 
-  private def checkProducerEpoch(producerEpoch: Short): Unit = {
+  private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
     if (producerEpoch < updatedEntry.producerEpoch) {
-      throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably
another producer " +
-        s"with a newer epoch. $producerEpoch (request epoch), ${updatedEntry.producerEpoch}
(server epoch)")
+      throw new ProducerFencedException(s"Producer's epoch at offset $offset is no longer
valid in " +
+        s"partition $topicPartition: $producerEpoch (request epoch), ${updatedEntry.producerEpoch}
(current epoch)")
     }
   }
 
-  private def checkSequence(producerEpoch: Short, appendFirstSeq: Int): Unit = {
+  private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: Long): Unit
= {
     if (producerEpoch != updatedEntry.producerEpoch) {
       if (appendFirstSeq != 0) {
         if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
-          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch:
$producerEpoch " +
-            s"(request epoch), $appendFirstSeq (seq. number)")
+          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch at
offset $offset in " +
+            s"partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq
(seq. number)")
         } else {
-          throw new UnknownProducerIdException(s"Found no record of producerId=$producerId
on the broker. It is possible " +
-            s"that the last message with the producerId=$producerId has been removed due
to hitting the retention limit.")
+          throw new UnknownProducerIdException(s"Found no record of producerId=$producerId
on the broker at offset $offset" +
+            s"in partition $topicPartition. It is possible that the last message with the
producerId=$producerId has " +
+            "been removed due to hitting the retention limit.")
         }
       }
     } else {
@@ -239,10 +241,12 @@ private[log] class ProducerAppendInfo(val producerId: Long,
         // the sequence number. Note that this check follows the fencing check, so the marker
still fences
         // old producers even if it cannot determine our next expected sequence number.
         throw new UnknownProducerIdException(s"Local producer state matches expected epoch
$producerEpoch " +
-          s"for producerId=$producerId, but next expected sequence number is not known.")
+          s"for producerId=$producerId at offset $offset in partition $topicPartition, but
the next expected " +
+          "sequence number is not known.")
       } else if (!inSequence(currentLastSeq, appendFirstSeq)) {
-        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId
$producerId: $appendFirstSeq " +
-          s"(incoming seq. number), $currentLastSeq (current end sequence number)")
+        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId
$producerId at " +
+          s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number),
" +
+          s"$currentLastSeq (current end sequence number)")
       }
     }
   }
@@ -277,13 +281,14 @@ private[log] class ProducerAppendInfo(val producerId: Long,
              firstOffset: Long,
              lastOffset: Long,
              isTransactional: Boolean): Unit = {
-    maybeValidateAppend(epoch, firstSeq)
+    maybeValidateAppend(epoch, firstSeq, firstOffset)
     updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp)
 
     updatedEntry.currentTxnFirstOffset match {
       case Some(_) if !isTransactional =>
         // Received a non-transactional message while a transaction is active
-        throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId")
+        throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId
at " +
+          s"offset $firstOffset in partition $topicPartition")
 
       case None if isTransactional =>
         // Began a new transaction
@@ -298,10 +303,11 @@ private[log] class ProducerAppendInfo(val producerId: Long,
                          producerEpoch: Short,
                          offset: Long,
                          timestamp: Long): CompletedTxn = {
-    checkProducerEpoch(producerEpoch)
+    checkProducerEpoch(producerEpoch, offset)
 
     if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch)
-      throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch: ${endTxnMarker.coordinatorEpoch}
" +
+      throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId
$producerId at " +
+        s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} "
+
         s"(zombie), ${updatedEntry.coordinatorEpoch} (current)")
 
     updatedEntry.maybeUpdateEpoch(producerEpoch)
@@ -630,7 +636,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
         ValidationType.Full
 
     val currentEntry = lastEntry(producerId).getOrElse(ProducerStateEntry.empty(producerId))
-    new ProducerAppendInfo(producerId, currentEntry, validationToPerform)
+    new ProducerAppendInfo(topicPartition, producerId, currentEntry, validationToPerform)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index d8b4043..9a3739d 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -3356,6 +3356,29 @@ class LogTest {
   }
 
   @Test
+  def testZombieCoordinatorFencedEmptyTransaction(): Unit = {
+    val pid = 1L
+    val epoch = 0.toShort
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
+
+    val buffer = ByteBuffer.allocate(256)
+    val append = appendTransactionalToBuffer(buffer, pid, epoch, leaderEpoch = 1)
+    append(0, 10)
+    appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L, ControlRecordType.COMMIT,
+      coordinatorEpoch = 0, leaderEpoch = 1)
+
+    buffer.flip()
+    log.appendAsFollower(MemoryRecords.readableRecords(buffer))
+
+    appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch
= 2, leaderEpoch = 1)
+    appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch
= 2, leaderEpoch = 1)
+    assertThrows[TransactionCoordinatorFencedException] {
+      appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch
= 1, leaderEpoch = 1)
+    }
+  }
+
+  @Test
   def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
@@ -3515,10 +3538,14 @@ class LogTest {
     }
   }
 
-  private def appendEndTxnMarkerAsLeader(log: Log, producerId: Long, producerEpoch: Short,
-                                         controlType: ControlRecordType, coordinatorEpoch:
Int = 0): Unit = {
+  private def appendEndTxnMarkerAsLeader(log: Log,
+                                         producerId: Long,
+                                         producerEpoch: Short,
+                                         controlType: ControlRecordType,
+                                         coordinatorEpoch: Int = 0,
+                                         leaderEpoch: Int = 0): Unit = {
     val records = endTxnRecords(controlType, producerId, producerEpoch, coordinatorEpoch
= coordinatorEpoch)
-    log.appendAsLeader(records, isFromClient = false, leaderEpoch = 0)
+    log.appendAsLeader(records, isFromClient = false, leaderEpoch = leaderEpoch)
   }
 
   private def appendNonTransactionalAsLeader(log: Log, numRecords: Int): Unit = {
@@ -3529,10 +3556,14 @@ class LogTest {
     log.appendAsLeader(records, leaderEpoch = 0)
   }
 
-  private def appendTransactionalToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch:
Short): (Long, Int) => Unit = {
+  private def appendTransactionalToBuffer(buffer: ByteBuffer,
+                                          producerId: Long,
+                                          producerEpoch: Short,
+                                          leaderEpoch: Int = 0): (Long, Int) => Unit =
{
     var sequence = 0
     (offset: Long, numRecords: Int) => {
-      val builder = MemoryRecords.builder(buffer, CompressionType.NONE, offset, producerId,
producerEpoch, sequence, true)
+      val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
TimestampType.CREATE_TIME,
+        offset, System.currentTimeMillis(), producerId, producerEpoch, sequence, true, leaderEpoch)
       for (seq <- sequence until sequence + numRecords) {
         val record = new SimpleRecord(s"$seq".getBytes)
         builder.append(record)
@@ -3543,10 +3574,15 @@ class LogTest {
     }
   }
 
-  private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch:
Short, offset: Long,
-                                         controlType: ControlRecordType, coordinatorEpoch:
Int = 0): Unit = {
+  private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer,
+                                         producerId: Long,
+                                         producerEpoch: Short,
+                                         offset: Long,
+                                         controlType: ControlRecordType,
+                                         coordinatorEpoch: Int = 0,
+                                         leaderEpoch: Int = 0): Unit = {
     val marker = new EndTransactionMarker(controlType, coordinatorEpoch)
-    MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), 0,
producerId, producerEpoch, marker)
+    MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), leaderEpoch,
producerId, producerEpoch, marker)
   }
 
   private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords:
Int): Unit = {
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 29de3cf..8968114 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -208,7 +208,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId),
ValidationType.Full)
+    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId),
ValidationType.Full)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset,
isTransactional = true)
 
     val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset
= 990000L,
@@ -224,7 +224,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId),
ValidationType.Full)
+    val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId),
ValidationType.Full)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset,
isTransactional = true)
 
     // use some other offset to simulate a follower append where the log offset metadata
won't typically


Mime
View raw message