kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)
Date Mon, 04 Mar 2019 21:28:42 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2ae0938  KAFKA-8002: Log dir reassignment stalls if future replica has different
segment base offset (#6364)
2ae0938 is described below

commit 2ae0938592c005e3b8257141c32dceddbbc35ade
Author: Bob Barrett <bob.barrett@outlook.com>
AuthorDate: Mon Mar 4 16:28:31 2019 -0500

    KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset
(#6364)
    
    This patch fixes a bug in log dir reassignment where Partition.maybeReplaceCurrentWithFutureReplica
would compare the entire LogEndOffsetMetadata of each replica to determine whether the reassignment
has completed. If the active segments of both replicas have different base segments (for example,
if the current replica had previously been cleaned and the future replica rolled segments
at different points), the reassignment will never complete. The fix is to compare only the
LogEndOffs [...]
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  6 +--
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 46 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4cedd5e..cf6b27f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -320,14 +320,14 @@ class Partition(val topicPartition: TopicPartition,
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
     val replica = localReplicaOrException
-    val futureReplicaLEO = futureLocalReplica.map(_.logEndOffset)
-    if (futureReplicaLEO.contains(replica.logEndOffset)) {
+    val futureReplicaLEO = futureLocalReplica.map(_.logEndOffset.messageOffset)
+    if (futureReplicaLEO.contains(replica.logEndOffset.messageOffset)) {
       // The write lock is needed to make sure that while ReplicaAlterDirThread checks the
LEO of the
       // current replica, no other thread can update LEO of the current replica via log truncation
or log append operation.
       inWriteLock(leaderIsrUpdateLock) {
         futureLocalReplica match {
           case Some(futureReplica) =>
-            if (replica.logEndOffset == futureReplica.logEndOffset) {
+            if (replica.logEndOffset.messageOffset == futureReplica.logEndOffset.messageOffset)
{
               logManager.replaceCurrentWithFutureLog(topicPartition)
               replica.log = futureReplica.log
               futureReplica.log = None
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index cc2a311..3cdc2bf 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -198,6 +198,52 @@ class PartitionTest {
     assertEquals(None, partition.futureLocalReplica)
   }
 
+  // Verify that replacement works when the replicas have the same log end offset but different
base offsets in the
+  // active segment
+  @Test
+  def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
+    // Write records with duplicate keys to current replica and roll at offset 6
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
+    val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
+    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k1".getBytes, "v2".getBytes),
+      new SimpleRecord("k1".getBytes, "v3".getBytes),
+      new SimpleRecord("k2".getBytes, "v4".getBytes),
+      new SimpleRecord("k2".getBytes, "v5".getBytes),
+      new SimpleRecord("k2".getBytes, "v6".getBytes)
+    ), leaderEpoch = 0)
+    log1.roll()
+    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k3".getBytes, "v7".getBytes),
+      new SimpleRecord("k4".getBytes, "v8".getBytes)
+    ), leaderEpoch = 0)
+
+    // Write to the future replica as if the log had been compacted, and do not roll the
segment
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
+    val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
+    val buffer = ByteBuffer.allocate(1024)
+    var builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+      TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, 0)
+    builder.appendWithOffset(2L, new SimpleRecord("k1".getBytes, "v3".getBytes))
+    builder.appendWithOffset(5L, new SimpleRecord("k2".getBytes, "v6".getBytes))
+    builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes))
+    builder.appendWithOffset(7L, new SimpleRecord("k4".getBytes, "v8".getBytes))
+
+    log2.appendAsFollower(builder.build())
+
+    val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
+    val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log
= Some(log2))
+    val partition = Partition(topicPartition, time, replicaManager)
+
+    partition.addReplicaIfNotExists(futureReplica)
+    partition.addReplicaIfNotExists(currentReplica)
+    assertEquals(Some(currentReplica), partition.localReplica)
+    assertEquals(Some(futureReplica), partition.futureLocalReplica)
+
+    assertTrue(partition.maybeReplaceCurrentWithFutureReplica())
+  }
+
   @Test
   def testFetchOffsetSnapshotEpochValidationForLeader(): Unit = {
     val leaderEpoch = 5


Mime
View raw message