kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Small cleanups in `AlterIsr` handling logic (#9663)
Date Tue, 01 Dec 2020 19:18:03 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8839514  MINOR: Small cleanups in `AlterIsr` handling logic (#9663)
8839514 is described below

commit 8839514efb9f037462541b8c1957a47ec19a1565
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Dec 1 11:17:08 2020 -0800

    MINOR: Small cleanups in `AlterIsr` handling logic (#9663)
    
    A few small cleanups in `Partition` handling of `AlterIsr`:
    
    - Factor state update and log message into `sendAlterIsrRequest`
    - Ensure illegal state error gets raised if a retry fails to be enqueued
    - Always check the proposed state against the current state in `handleAlterIsrResponse`
    - Add `toString` implementations to `IsrState` case classes
    
    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala | 100 +++++++++++++---------
 1 file changed, 61 insertions(+), 39 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 7b5d6b7..f6a9e83 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -181,21 +181,47 @@ sealed trait IsrState {
   def isInflight: Boolean
 }
 
-case class PendingExpandIsr(isr: Set[Int], newInSyncReplicaId: Int) extends IsrState {
+case class PendingExpandIsr(
+  isr: Set[Int],
+  newInSyncReplicaId: Int
+) extends IsrState {
   val maximalIsr = isr + newInSyncReplicaId
   val isInflight = true
+
+  override def toString: String = {
+    s"PendingExpandIsr(isr=$isr" +
+      s", newInSyncReplicaId=$newInSyncReplicaId" +
+      ")"
+  }
 }
 
-case class PendingShrinkIsr(isr: Set[Int], outOfSyncReplicaIds: Set[Int]) extends IsrState
 {
+case class PendingShrinkIsr(
+  isr: Set[Int],
+  outOfSyncReplicaIds: Set[Int]
+) extends IsrState  {
   val maximalIsr = isr
   val isInflight = true
+
+  override def toString: String = {
+    s"PendingShrinkIsr(isr=$isr" +
+      s", outOfSyncReplicaIds=$outOfSyncReplicaIds" +
+      ")"
+  }
 }
 
-case class CommittedIsr(isr: Set[Int]) extends IsrState {
+case class CommittedIsr(
+  isr: Set[Int]
+) extends IsrState {
   val maximalIsr = isr
   val isInflight = false
+
+  override def toString: String = {
+    s"CommittedIsr(isr=$isr" +
+      ")"
+  }
 }
 
+
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR,
RAR
  *
@@ -1286,14 +1312,7 @@ class Partition(val topicPartition: TopicPartition,
     if (!isrState.isInflight) {
       // When expanding the ISR, we can safely assume the new replica will make it into the
ISR since this puts us in
       // a more constrained state for advancing the HW.
-      val proposedIsrState = PendingExpandIsr(isrState.isr, newInSyncReplica)
-      if (sendAlterIsrRequest(proposedIsrState)) {
-        // Only update our ISR state of AlterIsrManager accepts our update
-        debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR updated to [${isrState.maximalIsr.mkString(",")}]")
-        isrState = proposedIsrState
-      } else {
-        throw new IllegalStateException("Failed to enqueue ISR expansion even though there
was no apparent in-flight ISR changes")
-      }
+      sendAlterIsrRequest(PendingExpandIsr(isrState.isr, newInSyncReplica))
     } else {
       trace(s"ISR update in-flight, not adding new in-sync replica $newInSyncReplica")
     }
@@ -1321,13 +1340,7 @@ class Partition(val topicPartition: TopicPartition,
       // When shrinking the ISR, we cannot assume that the update will succeed as this could
erroneously advance the HW
       // We update pendingInSyncReplicaIds here simply to prevent any further ISR updates
from occurring until we get
       // the next LeaderAndIsr
-      val proposedIsrState = PendingShrinkIsr(isrState.isr, outOfSyncReplicas)
-      if (sendAlterIsrRequest(proposedIsrState)) {
-        debug(s"Removing out-of-sync replicas $outOfSyncReplicas")
-        isrState = proposedIsrState
-      } else {
-        throw new IllegalStateException("Failed to enqueue ISR shrink even though there was
no apparent in-flight ISR changes")
-      }
+      sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas))
     } else {
       trace(s"ISR update in-flight, not removing out-of-sync replicas $outOfSyncReplicas")
     }
@@ -1351,19 +1364,24 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
-    val isrToSendOpt: Option[Set[Int]] = proposedIsrState match {
-      case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + newInSyncReplicaId)
-      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- outOfSyncReplicaIds)
-      case CommittedIsr(_) =>
-        error(s"Asked to send AlterIsr but there are no pending updates")
-        None
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
+    val isrToSend: Set[Int] = proposedIsrState match {
+      case PendingExpandIsr(isr, newInSyncReplicaId) => isr + newInSyncReplicaId
+      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- outOfSyncReplicaIds
+      case state =>
+        throw new IllegalStateException(s"Invalid state $state for `AlterIsr` request for
partition $topicPartition")
     }
-    isrToSendOpt.exists { isrToSend =>
-      val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList,
zkVersion)
-      val callbackPartial = handleAlterIsrResponse(isrToSend, _ : Either[Errors, LeaderAndIsr])
-      alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, callbackPartial))
+
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList,
zkVersion)
+    val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState))
+
+    if (!alterIsrManager.enqueue(alterIsrItem)) {
+      throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request with state "
+
+        s"$newLeaderAndIsr for partition $topicPartition")
     }
+
+    isrState = proposedIsrState
+    debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after transition
to $proposedIsrState")
   }
 
   /**
@@ -1372,23 +1390,27 @@ class Partition(val topicPartition: TopicPartition,
    * Since our error was non-retryable we are okay staying in this state until we see new
metadata from UpdateMetadata
    * or LeaderAndIsr
    */
-  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: Either[Errors, LeaderAndIsr]):
Unit = {
+  private def handleAlterIsrResponse(proposedIsrState: IsrState)(result: Either[Errors, LeaderAndIsr]):
Unit = {
     inWriteLock(leaderIsrUpdateLock) {
+      if (isrState != proposedIsrState) {
+        // This means isrState was updated through leader election or some other mechanism
before we got the AlterIsr
+        // response. We don't know what happened on the controller exactly, but we do know
this response is out of date
+        // so we ignore it.
+        debug(s"Ignoring failed ISR update to $proposedIsrState since we have already updated
state to $isrState")
+        return
+      }
+
       result match {
         case Left(error: Errors) => error match {
           case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
-            debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} since
it doesn't know about this topic or partition. Giving up.")
+            debug(s"Controller failed to update ISR to $proposedIsrState since it doesn't
know about this topic or partition. Giving up.")
           case Errors.FENCED_LEADER_EPOCH =>
-            debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} since
we sent an old leader epoch. Giving up.")
+            debug(s"Controller failed to update ISR to $proposedIsrState since we sent an
old leader epoch. Giving up.")
           case Errors.INVALID_UPDATE_VERSION =>
-            debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to
invalid zk version. Giving up.")
+            debug(s"Controller failed to update ISR to $proposedIsrState due to invalid zk
version. Giving up.")
           case _ =>
-            if (isrState.isInflight) {
-              warn(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due
to $error. Retrying.")
-              sendAlterIsrRequest(isrState)
-            } else {
-              warn(s"Ignoring failed ISR update to ${proposedIsr.mkString(",")} since due
to $error since we have a committed ISR.")
-            }
+            warn(s"Controller failed to update ISR to $proposedIsrState due to unexpected
$error. Retrying.")
+            sendAlterIsrRequest(proposedIsrState)
         }
         case Right(leaderAndIsr: LeaderAndIsr) =>
           // Success from controller, still need to check a few things


Mime
View raw message