kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)
Date Mon, 04 Mar 2019 22:12:43 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new b0578b9  KAFKA-8002: Log dir reassignment stalls if future replica has different
segment base offset (#6364)
b0578b9 is described below

commit b0578b93cd8122b72f20451ed136584e8e0a9198
Author: Bob Barrett <bob.barrett@outlook.com>
AuthorDate: Mon Mar 4 16:28:31 2019 -0500

    KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset
(#6364)
    
    This patch fixes a bug in log dir reassignment where Partition.maybeReplaceCurrentWithFutureReplica
would compare the entire LogEndOffsetMetadata of each replica to determine whether the reassignment
has completed. If the active segments of both replicas have different base segments (for example,
if the current replica had previously been cleaned and the future replica rolled segments
at different points), the reassignment will never complete. The fix is to compare only the
LogEndOffs [...]
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  6 +--
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 45 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index f398f8b..60f05ce 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -236,14 +236,14 @@ class Partition(val topic: String,
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
     val replica = getReplica().get
-    val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset)
-    if (futureReplicaLEO.contains(replica.logEndOffset)) {
+    val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset.messageOffset)
+    if (futureReplicaLEO.contains(replica.logEndOffset.messageOffset)) {
       // The write lock is needed to make sure that while ReplicaAlterDirThread checks the
LEO of the
       // current replica, no other thread can update LEO of the current replica via log truncation
or log append operation.
       inWriteLock(leaderIsrUpdateLock) {
         getReplica(Request.FutureLocalReplicaId) match {
           case Some(futureReplica) =>
-            if (replica.logEndOffset == futureReplica.logEndOffset) {
+            if (replica.logEndOffset.messageOffset == futureReplica.logEndOffset.messageOffset)
{
               logManager.replaceCurrentWithFutureLog(topicPartition)
               replica.log = futureReplica.log
               futureReplica.log = None
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index e58028b..74c4f23 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -216,6 +216,51 @@ class PartitionTest {
     assertEquals(None, partition.getReplica(Request.FutureLocalReplicaId))
   }
 
+  // Verify that replacement works when the replicas have the same log end offset but different
base offsets in the
+  // active segment
+  @Test
+  def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
+    // Write records with duplicate keys to current replica and roll at offset 6
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
+    val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
+    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k1".getBytes, "v2".getBytes),
+      new SimpleRecord("k1".getBytes, "v3".getBytes),
+      new SimpleRecord("k2".getBytes, "v4".getBytes),
+      new SimpleRecord("k2".getBytes, "v5".getBytes),
+      new SimpleRecord("k2".getBytes, "v6".getBytes)
+    ), leaderEpoch = 0)
+    log1.roll()
+    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k3".getBytes, "v7".getBytes),
+      new SimpleRecord("k4".getBytes, "v8".getBytes)
+    ), leaderEpoch = 0)
+
+    // Write to the future replica as if the log had been compacted, and do not roll the
segment
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
+    val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
+    val buffer = ByteBuffer.allocate(1024)
+    var builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+      TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, 0)
+    builder.appendWithOffset(2L, new SimpleRecord("k1".getBytes, "v3".getBytes))
+    builder.appendWithOffset(5L, new SimpleRecord("k2".getBytes, "v6".getBytes))
+    builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes))
+    builder.appendWithOffset(7L, new SimpleRecord("k4".getBytes, "v8".getBytes))
+
+    log2.appendAsFollower(builder.build())
+
+    val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
+    val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log
= Some(log2))
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+
+    partition.addReplicaIfNotExists(futureReplica)
+    partition.addReplicaIfNotExists(currentReplica)
+    assertEquals(Some(currentReplica), partition.getReplica(brokerId))
+    assertEquals(Some(futureReplica), partition.getReplica(Request.FutureLocalReplicaId))
+
+    assertTrue(partition.maybeReplaceCurrentWithFutureReplica())
+  }
 
   @Test
   def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {


Mime
View raw message