kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-5249; Fix incorrect producer snapshot offsets when recovering segments
Date Mon, 15 May 2017 22:23:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4c75f31a5 -> 7bb551b4a


KAFKA-5249; Fix incorrect producer snapshot offsets when recovering segments

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3060 from hachikuji/KAFKA-5249


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7bb551b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7bb551b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7bb551b4

Branch: refs/heads/trunk
Commit: 7bb551b4a1737f1819e11e08248b1f2277a0680e
Parents: 4c75f31
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon May 15 15:23:26 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon May 15 15:23:26 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |   1 +
 core/src/main/scala/kafka/log/LogSegment.scala  |   1 +
 .../scala/kafka/log/ProducerStateManager.scala  |  10 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |   8 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 108 ++++++++++++++++++-
 .../kafka/log/ProducerStateManagerTest.scala    |  21 ++++
 6 files changed, 141 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index e2d9489..e3a21d1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -316,6 +316,7 @@ class Log(@volatile var dir: File,
       if (fetchDataInfo != null)
         loadProducersFromLog(stateManager, fetchDataInfo.records)
     }
+    stateManager.updateMapEndOffset(segment.baseOffset)
     val bytesTruncated = segment.recover(config.maxMessageSize, stateManager, leaderEpochCache)
 
     // once we have recovered the segment's data, take a snapshot to ensure that we won't

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 6699143..cf3ef0e 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -156,6 +156,7 @@ class LogSegment(val log: FileRecords,
         updateTxnIndex(completedTxn, lastStableOffset)
       }
     }
+    producerStateManager.updateMapEndOffset(batch.lastOffset + 1)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 02609b2..ba7c470 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -429,6 +429,12 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    * or equal to the high watermark.
    */
   def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {
+    // remove all out of range snapshots
+    deleteSnapshotFiles { file =>
+      val offset = offsetFromFilename(file.getName)
+      offset > logEndOffset || offset <= logStartOffset
+    }
+
     if (logEndOffset != mapEndOffset) {
       producers.clear()
       ongoingTxns.clear()
@@ -436,10 +442,6 @@ class ProducerStateManager(val topicPartition: TopicPartition,
       // since we assume that the offset is less than or equal to the high watermark, it
is
       // safe to clear the unreplicated transactions
       unreplicatedTxns.clear()
-      deleteSnapshotFiles { file =>
-        val offset = offsetFromFilename(file.getName)
-        offset > logEndOffset || offset <= logStartOffset
-      }
       loadFromSnapshot(logStartOffset, currentTimeMs)
     } else {
       evictUnretainedProducers(logStartOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index c3da9b3..5db1ed6 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -302,7 +302,10 @@ class LogSegmentTest {
     segment.append(firstOffset = 107L, largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch,
offset = 107L))
 
-    segment.recover(64 * 1024, new ProducerStateManager(topicPartition, logDir))
+    var stateManager = new ProducerStateManager(topicPartition, logDir)
+    segment.recover(64 * 1024, stateManager)
+    assertEquals(108L, stateManager.mapEndOffset)
+
 
     var abortedTxns = segment.txnIndex.allAbortedTxns
     assertEquals(1, abortedTxns.size)
@@ -313,9 +316,10 @@ class LogSegmentTest {
     assertEquals(100L, abortedTxn.lastStableOffset)
 
     // recover again, but this time assuming the transaction from pid2 began on a previous
segment
-    val stateManager = new ProducerStateManager(topicPartition, logDir)
+    stateManager = new ProducerStateManager(topicPartition, logDir)
     stateManager.loadProducerEntry(ProducerIdEntry(pid2, producerEpoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP,
0, Some(75L)))
     segment.recover(64 * 1024, stateManager)
+    assertEquals(108L, stateManager.mapEndOffset)
 
     abortedTxns = segment.txnIndex.allAbortedTxns
     assertEquals(1, abortedTxns.size)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index d7ca029..8c330ed 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -178,7 +178,7 @@ class LogTest {
 
   @Test
   def testRebuildPidMapWithCompactedData() {
-    val log = createLog(2048, pidSnapshotIntervalMs = Int.MaxValue)
+    val log = createLog(2048)
     val pid = 1L
     val epoch = 0.toShort
     val seq = 0
@@ -2319,6 +2319,110 @@ class LogTest {
   }
 
   @Test
+  def testRecoverOnlyLastSegment(): Unit = {
+    val log = createLog(128)
+    val epoch = 0.toShort
+
+    val pid1 = 1L
+    val pid2 = 2L
+    val pid3 = 3L
+    val pid4 = 4L
+
+    val appendPid1 = appendTransactionalAsLeader(log, pid1, epoch)
+    val appendPid2 = appendTransactionalAsLeader(log, pid2, epoch)
+    val appendPid3 = appendTransactionalAsLeader(log, pid3, epoch)
+    val appendPid4 = appendTransactionalAsLeader(log, pid4, epoch)
+
+    // mix transactional and non-transactional data
+    appendPid1(5) // nextOffset: 5
+    appendNonTransactionalAsLeader(log, 3) // 8
+    appendPid2(2) // 10
+    appendPid1(4) // 14
+    appendPid3(3) // 17
+    appendNonTransactionalAsLeader(log, 2) // 19
+    appendPid1(10) // 29
+    appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT) // 30
+    appendPid2(6) // 36
+    appendPid4(3) // 39
+    appendNonTransactionalAsLeader(log, 10) // 49
+    appendPid3(9) // 58
+    appendEndTxnMarkerAsLeader(log, pid3, epoch, ControlRecordType.COMMIT) // 59
+    appendPid4(8) // 67
+    appendPid2(7) // 74
+    appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.ABORT) // 75
+    appendNonTransactionalAsLeader(log, 10) // 85
+    appendPid4(4) // 89
+    appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90
+
+    // delete the last offset and transaction index files to force recovery
+    val lastSegment = log.logSegments.last
+    val recoveryPoint = lastSegment.baseOffset
+    lastSegment.index.delete()
+    lastSegment.txnIndex.delete()
+
+    log.close()
+
+    val reloadedLog = createLog(1024, recoveryPoint = recoveryPoint)
+    val abortedTransactions = allAbortedTransactions(reloadedLog)
+    assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)),
abortedTransactions)
+  }
+
+  @Test
+  def testRecoverLastSegmentWithNoSnapshots(): Unit = {
+    val log = createLog(128)
+    val epoch = 0.toShort
+
+    val pid1 = 1L
+    val pid2 = 2L
+    val pid3 = 3L
+    val pid4 = 4L
+
+    val appendPid1 = appendTransactionalAsLeader(log, pid1, epoch)
+    val appendPid2 = appendTransactionalAsLeader(log, pid2, epoch)
+    val appendPid3 = appendTransactionalAsLeader(log, pid3, epoch)
+    val appendPid4 = appendTransactionalAsLeader(log, pid4, epoch)
+
+    // mix transactional and non-transactional data
+    appendPid1(5) // nextOffset: 5
+    appendNonTransactionalAsLeader(log, 3) // 8
+    appendPid2(2) // 10
+    appendPid1(4) // 14
+    appendPid3(3) // 17
+    appendNonTransactionalAsLeader(log, 2) // 19
+    appendPid1(10) // 29
+    appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT) // 30
+    appendPid2(6) // 36
+    appendPid4(3) // 39
+    appendNonTransactionalAsLeader(log, 10) // 49
+    appendPid3(9) // 58
+    appendEndTxnMarkerAsLeader(log, pid3, epoch, ControlRecordType.COMMIT) // 59
+    appendPid4(8) // 67
+    appendPid2(7) // 74
+    appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.ABORT) // 75
+    appendNonTransactionalAsLeader(log, 10) // 85
+    appendPid4(4) // 89
+    appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90
+
+    // delete all snapshot files
+    logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach
{ file =>
+      file.delete()
+    }
+
+    // delete the last offset and transaction index files to force recovery. this should
force us to rebuild
+    // the producer state from the start of the log
+    val lastSegment = log.logSegments.last
+    val recoveryPoint = lastSegment.baseOffset
+    lastSegment.index.delete()
+    lastSegment.txnIndex.delete()
+
+    log.close()
+
+    val reloadedLog = createLog(1024, recoveryPoint = recoveryPoint)
+    val abortedTransactions = allAbortedTransactions(reloadedLog)
+    assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)),
abortedTransactions)
+  }
+
+  @Test
   def testTransactionIndexUpdatedThroughReplication(): Unit = {
     val epoch = 0.toShort
     val log = createLog(1024 * 1024)
@@ -2474,7 +2578,7 @@ class LogTest {
   private def createLog(messageSizeInBytes: Int, retentionMs: Int = -1, retentionBytes: Int
= -1,
                         cleanupPolicy: String = "delete", messagesPerSegment: Int = 5,
                         maxPidExpirationMs: Int = 300000, pidExpirationCheckIntervalMs: Int
= 30000,
-                        pidSnapshotIntervalMs: Int = 60000): Log = {
+                        recoveryPoint: Long = 0L): Log = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, messageSizeInBytes * messagesPerSegment: Integer)
     logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bb551b4/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 353642b..ad26339 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -23,6 +23,7 @@ import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch}
 import org.apache.kafka.common.utils.{MockTime, Utils}
 import org.junit.Assert._
@@ -227,6 +228,26 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   @Test
+  def testTruncateAndReloadRemovesOutOfRangeSnapshots(): Unit = {
+    val epoch = 0.toShort
+    append(idMapping, pid, epoch, 0, 0L)
+    idMapping.takeSnapshot()
+    append(idMapping, pid, epoch, 1, 1L)
+    idMapping.takeSnapshot()
+    append(idMapping, pid, epoch, 2, 2L)
+    idMapping.takeSnapshot()
+    append(idMapping, pid, epoch, 3, 3L)
+    idMapping.takeSnapshot()
+    append(idMapping, pid, epoch, 4, 4L)
+    idMapping.takeSnapshot()
+
+    idMapping.truncateAndReload(1L, 3L, time.milliseconds())
+
+    assertEquals(Some(2L), idMapping.oldestSnapshotOffset)
+    assertEquals(Some(3L), idMapping.latestSnapshotOffset)
+  }
+
+  @Test
   def testTakeSnapshot(): Unit = {
     val epoch = 0.toShort
     append(idMapping, pid, 0, epoch, 0L, 0L)


Mime
View raw message