kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7152; Avoid moving a replica out of isr if its LEO equals leader's LEO
Date Sat, 21 Jul 2018 23:53:12 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9a7f29c  KAFKA-7152; Avoid moving a replica out of isr if its LEO equals leader's
LEO
9a7f29c is described below

commit 9a7f29c1ede9f1ece67b73d2d6497d60f38fb62a
Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>
AuthorDate: Sat Jul 21 16:52:54 2018 -0700

    KAFKA-7152; Avoid moving a replica out of isr if its LEO equals leader's LEO
    
    When there are many inactive partitions in the cluster, we observed constant churn of
URP in the cluster even if follower can catch up with leader's byte-in-rate because leader
broker frequently moves replicas of inactive partitions out of ISR. This PR mitigates this
issue by not moving replica out of ISR if follower's LEO == leader's LEO.
    
    Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>
    
    Reviewers: Dong Lin <lindong28@gmail.com>
    
    Closes #5412 from hzxa21/KAFKA-7152
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  6 ++-
 .../unit/kafka/server/ISRExpirationTest.scala      | 62 +++++++++++++++++-----
 2 files changed, 53 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index b80c344..154a8f9 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -553,7 +553,8 @@ class Partition(val topic: String,
 
   def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
     /**
-     * there are two cases that will be handled here -
+     * If the follower already has the same leo as the leader, it will not be considered
as out-of-sync,
+     * otherwise there are two cases that will be handled here -
      * 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms,
      *                     the follower is stuck and should be removed from the ISR
      * 2. Slow followers: If the replica has not read up to the leo within the last maxLagMs
ms,
@@ -565,7 +566,8 @@ class Partition(val topic: String,
      **/
     val candidateReplicas = inSyncReplicas - leaderReplica
 
-    val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs)
> maxLagMs)
+    val laggingReplicas = candidateReplicas.filter(r =>
+      r.logEndOffset.messageOffset != leaderReplica.logEndOffset.messageOffset &&
(time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
     if (laggingReplicas.nonEmpty)
       debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(",")))
 
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 8212ed6..c90a5b9 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -40,6 +40,7 @@ class IsrExpirationTest {
   var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
   val replicaLagTimeMaxMs = 100L
   val replicaFetchWaitMaxMs = 100
+  val leaderLogEndOffset = 20
 
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
@@ -81,12 +82,12 @@ class IsrExpirationTest {
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
 
-    // let the follower catch up to the Leader logEndOffset (15)
+    // let the follower catch up to the Leader logEndOffset - 1
     for (replica <- partition0.assignedReplicas - leaderReplica)
-      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L),
MemoryRecords.EMPTY),
-                                                    highWatermark = 15L,
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(leaderLogEndOffset
- 1), MemoryRecords.EMPTY),
+                                                    highWatermark = leaderLogEndOffset -
1,
                                                     leaderLogStartOffset = 0L,
-                                                    leaderLogEndOffset = 15L,
+                                                    leaderLogEndOffset = leaderLogEndOffset,
                                                     followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
                                                     readSize = -1,
@@ -138,10 +139,10 @@ class IsrExpirationTest {
 
     // Make the remote replica not read to the end of log. It should be not be out of sync
for at least 100 ms
     for (replica <- partition0.assignedReplicas - leaderReplica)
-      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(10L),
MemoryRecords.EMPTY),
-                                                    highWatermark = 10L,
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(leaderLogEndOffset
- 2), MemoryRecords.EMPTY),
+                                                    highWatermark = leaderLogEndOffset -
2,
                                                     leaderLogStartOffset = 0L,
-                                                    leaderLogEndOffset = 15L,
+                                                    leaderLogEndOffset = leaderLogEndOffset,
                                                     followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
                                                     readSize = -1,
@@ -155,10 +156,10 @@ class IsrExpirationTest {
     time.sleep(75)
 
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
-      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L),
MemoryRecords.EMPTY),
-                            highWatermark = 11L,
+      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(leaderLogEndOffset
- 1), MemoryRecords.EMPTY),
+                            highWatermark = leaderLogEndOffset - 1,
                             leaderLogStartOffset = 0L,
-                            leaderLogEndOffset = 15L,
+                            leaderLogEndOffset = leaderLogEndOffset,
                             followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
                             readSize = -1,
@@ -175,10 +176,10 @@ class IsrExpirationTest {
 
     // Now actually make a fetch to the end of the log. The replicas should be back in ISR
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
-      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L),
MemoryRecords.EMPTY),
-                            highWatermark = 15L,
+      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(leaderLogEndOffset),
MemoryRecords.EMPTY),
+                            highWatermark = leaderLogEndOffset,
                             leaderLogStartOffset = 0L,
-                            leaderLogEndOffset = 15L,
+                            leaderLogEndOffset = leaderLogEndOffset,
                             followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
                             readSize = -1,
@@ -190,6 +191,40 @@ class IsrExpirationTest {
     EasyMock.verify(log)
   }
 
+  /*
+   * Test the case where a follower has already caught up with same log end offset with the
leader. This follower should not be considered as out-of-sync
+   */
+  @Test
+  def testIsrExpirationForCaughtUpFollowers() {
+    val log = logMock
+
+    // create one partition and all replicas
+    val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+
+    // let the follower catch up to the Leader logEndOffset
+    for (replica <- partition0.assignedReplicas - leaderReplica)
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(leaderLogEndOffset),
MemoryRecords.EMPTY),
+        highWatermark = leaderLogEndOffset,
+        leaderLogStartOffset = 0L,
+        leaderLogEndOffset = leaderLogEndOffset,
+        followerLogStartOffset = 0L,
+        fetchTimeMs = time.milliseconds,
+        readSize = -1,
+        lastStableOffset = None))
+    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+
+    // let some time pass
+    time.sleep(150)
+
+    // even though follower hasn't pulled any data for > replicaMaxLagTimeMs ms, the follower
has already caught up. So it is not out-of-sync.
+    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+    EasyMock.verify(log)
+  }
+
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time,
config: KafkaConfig,
                                                localLog: Log): Partition = {
     val leaderId = config.brokerId
@@ -222,6 +257,7 @@ class IsrExpirationTest {
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
     EasyMock.expect(log.onHighWatermarkIncremented(0L))
+    EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes()
     EasyMock.replay(log)
     log
   }


Mime
View raw message