kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] 01/02: KAFKA-9654; Update epoch in `ReplicaAlterLogDirsThread` after new LeaderAndIsr (#8223)
Date Fri, 10 Apr 2020 02:41:56 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4bb3b0853f45d580947dbf1842a0fce10a164bb5
Author: Chia-Ping Tsai <chia7712@gmail.com>
AuthorDate: Fri Mar 20 07:49:35 2020 +0800

    KAFKA-9654; Update epoch in `ReplicaAlterLogDirsThread` after new LeaderAndIsr  (#8223)
    
    Currently when there is a leader change with a log dir reassignment in progress, we do
not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`.
This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked
as failed, which is a permanent failure until the broker is restarted. This patch fixes the
problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr
request from the controller.
    
    Reviewers: Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 57 +++++++++++++++-------
 .../main/scala/kafka/server/ReplicaManager.scala   | 15 +++---
 .../admin/ReassignPartitionsClusterTest.scala      | 27 +++++++---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 52 +++++++++++++++++++-
 4 files changed, 117 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 203cc62..08ad47a 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -219,7 +219,7 @@ abstract class AbstractFetcherThread(name: String,
         curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
       }
 
-      val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets)
+      val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets,
latestEpochsForPartitions)
       handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets")
       updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
     }
@@ -244,7 +244,8 @@ abstract class AbstractFetcherThread(name: String,
     updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
   }
 
-  private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset]):
ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
+  private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset],
+                                             latestEpochsForPartitions: Map[TopicPartition,
EpochData]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
     val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
     val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
@@ -256,7 +257,11 @@ abstract class AbstractFetcherThread(name: String,
             fetchOffsets.put(tp, offsetTruncationState)
 
         case Errors.FENCED_LEADER_EPOCH =>
-          onPartitionFenced(tp)
+          if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
+            p =>
+              if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
+              else None
+          })) partitionsWithError += tp
 
         case error =>
           info(s"Retrying leaderEpoch request for partition $tp as the leader reported an
error: $error")
@@ -267,12 +272,22 @@ abstract class AbstractFetcherThread(name: String,
     ResultWithPartitions(fetchOffsets, partitionsWithError)
   }
 
-  private def onPartitionFenced(tp: TopicPartition): Unit = inLock(partitionMapLock) {
-    Option(partitionStates.stateValue(tp)).foreach { currentFetchState =>
+  /**
+   * remove the partition if the partition state is NOT updated. Otherwise, keep the partition
active.
+   * @return true if the epoch in this thread is updated. otherwise, false
+   */
+  private def onPartitionFenced(tp: TopicPartition, requestEpoch: Option[Int]): Boolean =
inLock(partitionMapLock) {
+    Option(partitionStates.stateValue(tp)).exists { currentFetchState =>
       val currentLeaderEpoch = currentFetchState.currentLeaderEpoch
-      info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader.
Will await " +
-        s"the new LeaderAndIsr state before resuming fetching.")
-      markPartitionFailed(tp)
+      if (requestEpoch.contains(currentLeaderEpoch)) {
+        info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader.
Will await " +
+          s"the new LeaderAndIsr state before resuming fetching.")
+        markPartitionFailed(tp)
+        false
+      } else {
+        info(s"Partition $tp has an new epoch ($currentLeaderEpoch) than the current leader.
retry the partition later")
+        true
+      }
     }
   }
 
@@ -309,6 +324,10 @@ abstract class AbstractFetcherThread(name: String,
             // the current offset is the same as the offset requested.
             val fetchState = fetchStates(topicPartition)
             if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch)
{
+              val requestEpoch = if (fetchState.currentLeaderEpoch >= 0)
+                Some(fetchState.currentLeaderEpoch)
+              else
+                None
               partitionData.error match {
                 case Errors.NONE =>
                   try {
@@ -351,7 +370,7 @@ abstract class AbstractFetcherThread(name: String,
                       markPartitionFailed(topicPartition)
                   }
                 case Errors.OFFSET_OUT_OF_RANGE =>
-                  if (!handleOutOfRangeError(topicPartition, currentFetchState))
+                  if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))
                     partitionsWithError += topicPartition
 
                 case Errors.UNKNOWN_LEADER_EPOCH =>
@@ -360,7 +379,7 @@ abstract class AbstractFetcherThread(name: String,
                   partitionsWithError += topicPartition
 
                 case Errors.FENCED_LEADER_EPOCH =>
-                  onPartitionFenced(topicPartition)
+                  if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError
+= topicPartition
 
                 case Errors.NOT_LEADER_FOR_PARTITION =>
                   debug(s"Remote broker is not the leader for partition $topicPartition,
which could indicate " +
@@ -522,32 +541,34 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   /**
-   * Handle the out of range error. Return true if the request succeeded or was fenced, which
means we need
-   * not backoff and retry. False if there was a retriable error.
+   * Handle the out of range error. Return false if
+   * 1) the request succeeded or
+   * 2) was fenced and this thread haven't received new epoch,
+   * which means we need not backoff and retry. True if there was a retriable error.
    */
   private def handleOutOfRangeError(topicPartition: TopicPartition,
-                                    fetchState: PartitionFetchState): Boolean = {
+                                    fetchState: PartitionFetchState,
+                                    requestEpoch: Option[Int]): Boolean = {
     try {
       val newOffset = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
       val newFetchState = PartitionFetchState(newOffset, fetchState.currentLeaderEpoch, state
= Fetching)
       partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
       info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is "
+
         s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
-      true
+      false
     } catch {
       case _: FencedLeaderEpochException =>
-        onPartitionFenced(topicPartition)
-        true
+        onPartitionFenced(topicPartition, requestEpoch)
 
       case e @ (_ : UnknownTopicOrPartitionException |
                 _ : UnknownLeaderEpochException |
                 _ : NotLeaderForPartitionException) =>
         info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}")
-        false
+        true
 
       case e: Throwable =>
         error(s"Error getting offset for partition $topicPartition", e)
-        false
+        true
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9b83090..99e20f1 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1063,14 +1063,10 @@ class ReplicaManager(val config: KafkaConfig,
 
         // First check partition's leader epoch
         val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
-        val newPartitions = new mutable.HashSet[Partition]
+        val updatedPartitions = new mutable.HashSet[Partition]
 
         leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo)
=>
-          val partition = getPartition(topicPartition).getOrElse {
-            val createdPartition = getOrCreatePartition(topicPartition)
-            newPartitions.add(createdPartition)
-            createdPartition
-          }
+          val partition = getOrCreatePartition(topicPartition)
           val currentLeaderEpoch = partition.getLeaderEpoch
           val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch
           if (partition eq ReplicaManager.OfflinePartition) {
@@ -1082,9 +1078,10 @@ class ReplicaManager(val config: KafkaConfig,
           } else if (requestLeaderEpoch > currentLeaderEpoch) {
             // If the leader epoch is valid record the epoch of the controller that made
the leadership decision.
             // This is useful while updating the isr to maintain the decision maker controller's
epoch in the zookeeper path
-            if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
+            if (stateInfo.basePartitionState.replicas.contains(localBrokerId)) {
+              updatedPartitions.add(partition)
               partitionState.put(partition, stateInfo)
-            else {
+            } else {
               stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId
with " +
                 s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition
as itself is not " +
                 s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
@@ -1139,7 +1136,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
         val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
-        for (partition <- newPartitions) {
+        for (partition <- updatedPartitions) {
           val topicPartition = partition.topicPartition
           if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
             partition.localReplica.foreach { replica =>
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index f629707..01ab41f 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -66,9 +66,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging
{
     JAdminClient.create(props)
   }
 
-  def getRandomLogDirAssignment(brokerId: Int): String = {
+  def getRandomLogDirAssignment(brokerId: Int, excluded: Option[String] = None): String =
{
     val server = servers.find(_.config.brokerId == brokerId).get
-    val logDirs = server.config.logDirs
+    val logDirs = server.config.logDirs.filterNot(excluded.contains)
     new File(logDirs(Random.nextInt(logDirs.size))).getAbsolutePath
   }
 
@@ -134,18 +134,33 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
   }
 
   @Test
-  def shouldMoveSinglePartitionWithinBroker() {
+  def shouldMoveSinglePartitionToSameFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(true)
+
+  @Test
+  def shouldMoveSinglePartitionToDifferentFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(false)
+
+  def shouldMoveSinglePartitionWithinBroker(moveToSameFolder: Boolean): Unit = {
     // Given a single replica on server 100
     startBrokers(Seq(100, 101))
     adminClient = createAdminClient(servers)
-    val expectedLogDir = getRandomLogDirAssignment(100)
     createTopic(zkClient, topicName, Map(0 -> Seq(100)), servers = servers)
 
+    val replica = new TopicPartitionReplica(topicName, 0, 100)
+    val currentLogDir = adminClient.describeReplicaLogDirs(java.util.Collections.singleton(replica))
+      .all()
+      .get()
+      .get(replica)
+      .getCurrentReplicaLogDir
+
+    val expectedLogDir = if (moveToSameFolder)
+      currentLogDir
+    else
+      getRandomLogDirAssignment(100, excluded = Some(currentLogDir))
+
     // When we execute an assignment that moves an existing replica to another log directory
on the same broker
     val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[100],"log_dirs":["$expectedLogDir"]}]}"""
     ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
-    val replica = new TopicPartitionReplica(topicName, 0, 100)
-    TestUtils.waitUntilTrue(() => {
+    waitUntilTrue(() => {
       expectedLogDir == adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir
     }, "Partition should have been moved to the expected log directory", 1000)
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1c1cbd6..74466b4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -195,6 +195,55 @@ class ReplicaManagerTest {
   }
 
   @Test
+  def testFencedErrorCausedByBecomeLeader(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer)
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.getOrCreatePartition(topicPartition)
+          .getOrCreateReplica(0, isNew = false)
+
+      def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+        ApiKeys.LEADER_AND_ISR.latestVersion,
+        0,
+        0,
+        brokerEpoch,
+        Map(topicPartition -> new LeaderAndIsrRequest.PartitionState(0, 0,
+          epoch, brokerList, 0, brokerList, true)).asJava,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
+      ).build()
+
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
+      val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0),
expectLeader = true)
+        .localReplica.flatMap(_.log).get
+      assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).size)
+
+      // find the live and different folder
+      val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).head
+      assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+      replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
+      assertTrue(replicaManager.futureLocalReplica(topicPartition).flatMap(_.log).isDefined)
+
+      assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+      // change the epoch from 0 to 1 in order to make fenced error
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1), (_, _) => ())
+      TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount()
== 0),
+        s"the partition=$topicPartition should be removed from pending state")
+      // the partition is added to failedPartitions if fenced error happens
+      // if the thread is done before ReplicaManager#becomeLeaderOrFollower updates epoch,the
fenced error does
+      // not happen and failedPartitions is empty.
+      if (replicaManager.replicaAlterLogDirsManager.failedPartitions.size != 0) {
+        replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+        // send request again
+        replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
+        // the future folder exists so it fails to invoke thread
+        assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+      }
+    } finally replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
   def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
     val timer = new MockTimer
     val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
@@ -636,6 +685,7 @@ class ReplicaManagerTest {
       EasyMock.expect(mockLogMgr.truncateTo(Map(new TopicPartition(topic, topicPartition)
-> offsetFromLeader),
         isFuture = false)).once
     }
+    EasyMock.expect(mockLogMgr.getLog(new TopicPartition(topic, topicPartition), isFuture
= true)).andReturn(None)
     EasyMock.replay(mockLogMgr)
 
     val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
@@ -798,7 +848,7 @@ class ReplicaManagerTest {
 
   private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, aliveBrokerIds:
Seq[Int] = Seq(0, 1)): ReplicaManager = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val logProps = new Properties()
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))


Mime
View raw message