kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-10521; Skip partition watch registration when `AlterIsr` is expected (#9353)
Date Wed, 14 Oct 2020 00:54:46 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8118b6c  KAFKA-10521; Skip partition watch registration when `AlterIsr` is expected
(#9353)
8118b6c is described below

commit 8118b6c9f9c3a8cc77dca2f6a516254f03a43d5e
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Oct 13 17:53:47 2020 -0700

    KAFKA-10521; Skip partition watch registration when `AlterIsr` is expected (#9353)
    
    Before `AlterIsr` which was introduced in KIP-497, the controller would register watches
in Zookeeper for each reassigning partition so that it could be notified immediately when
the ISR was expanded and the reassignment could be completed. This notification is not needed
with the latest IBP when `AlterIsr` is enabled because the controller will execute all ISR
changes itself.
    
    There is one subtle detail. If we are in the middle of a roll in order to bump the IBP,
then it is possible for the controller to be on the latest IBP while some of the brokers are
still on the older one. In this case, the brokers on the older IBP will not send `AlterIsr`,
but we can still rely on the delayed notification through the `isr_notifications` path to
complete reassignments. This seems like a reasonable tradeoff since it should be a short window
before the roll is completed.
    
    Reviewers: David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
---
 .../scala/kafka/controller/KafkaController.scala   | 65 +++++++++++++-----
 .../main/scala/kafka/server/ReplicaManager.scala   | 36 +++++++---
 .../admin/ReassignPartitionsIntegrationTest.scala  | 78 +++++++++++++++++-----
 3 files changed, 139 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 47b52c6..cc66060 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -84,6 +84,7 @@ class KafkaController(val config: KafkaConfig,
   @volatile private var brokerInfo = initialBrokerInfo
   @volatile private var _brokerEpoch = initialBrokerEpoch
 
+  private val isAlterIsrEnabled = config.interBrokerProtocolVersion >= KAFKA_2_7_IV2
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext
= true, None)
   val controllerContext = new ControllerContext
   var controllerChannelManager = new ControllerChannelManager(controllerContext, config,
time, metrics,
@@ -789,8 +790,10 @@ class KafkaController(val config: KafkaConfig,
         stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas)
     }
 
-    val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager,
topicPartition)
-    zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
+    if (!isAlterIsrEnabled) {
+      val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager,
topicPartition)
+      zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
+    }
 
     controllerContext.partitionsBeingReassigned.add(topicPartition)
   }
@@ -1089,17 +1092,21 @@ class KafkaController(val config: KafkaConfig,
   }
 
   private def unregisterPartitionReassignmentIsrChangeHandlers(): Unit = {
-    controllerContext.partitionsBeingReassigned.foreach { tp =>
-      val path = TopicPartitionStateZNode.path(tp)
-      zkClient.unregisterZNodeChangeHandler(path)
+    if (!isAlterIsrEnabled) {
+      controllerContext.partitionsBeingReassigned.foreach { tp =>
+        val path = TopicPartitionStateZNode.path(tp)
+        zkClient.unregisterZNodeChangeHandler(path)
+      }
     }
   }
 
   private def removePartitionFromReassigningPartitions(topicPartition: TopicPartition,
                                                        assignment: ReplicaAssignment): Unit
= {
     if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
-      val path = TopicPartitionStateZNode.path(topicPartition)
-      zkClient.unregisterZNodeChangeHandler(path)
+      if (!isAlterIsrEnabled) {
+        val path = TopicPartitionStateZNode.path(topicPartition)
+        zkClient.unregisterZNodeChangeHandler(path)
+      }
       maybeRemoveFromZkReassignment((tp, replicas) => tp == topicPartition &&
replicas == assignment.replicas)
       controllerContext.partitionsBeingReassigned.remove(topicPartition)
     } else {
@@ -1830,13 +1837,17 @@ class KafkaController(val config: KafkaConfig,
     if (!isActive) return
 
     if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
-      val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
-      if (isReassignmentComplete(topicPartition, reassignment)) {
-        // resume the partition reassignment process
-        info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the
leader for " +
-          s"reassigning partition $topicPartition")
-        onPartitionReassignment(topicPartition, reassignment)
-      }
+      maybeCompleteReassignment(topicPartition)
+    }
+  }
+
+  private def maybeCompleteReassignment(topicPartition: TopicPartition): Unit = {
+    val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
+    if (isReassignmentComplete(topicPartition, reassignment)) {
+      // resume the partition reassignment process
+      info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader
for " +
+        s"reassigning partition $topicPartition")
+      onPartitionReassignment(topicPartition, reassignment)
     }
   }
 
@@ -2073,6 +2084,16 @@ class KafkaController(val config: KafkaConfig,
       if (partitions.nonEmpty) {
         updateLeaderAndIsrCache(partitions)
         processUpdateNotifications(partitions)
+
+        // During a partial upgrade, the controller may be on an IBP which assumes
+        // ISR changes through the `AlterIsr` API while some brokers are on an older
+        // IBP which assumes notification through Zookeeper. In this case, since the
+        // controller will not have registered watches for reassigning partitions, we
+        // can still rely on the batch ISR change notification path in order to
+        // complete the reassignment.
+        partitions.filter(controllerContext.partitionsBeingReassigned.contains).foreach {
topicPartition =>
+          maybeCompleteReassignment(topicPartition)
+        }
       }
     } finally {
       // delete the notifications
@@ -2227,7 +2248,8 @@ class KafkaController(val config: KafkaConfig,
     eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, alterIsrRequest.brokerEpoch,
isrsToAlter, responseCallback))
   }
 
-  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition,
LeaderAndIsr],
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long,
+                              isrsToAlter: Map[TopicPartition, LeaderAndIsr],
                               callback: AlterIsrCallback): Unit = {
 
     // Handle a few short-circuits
@@ -2315,6 +2337,19 @@ class KafkaController(val config: KafkaConfig,
     }
 
     callback.apply(response)
+
+    // After we have returned the result of the `AlterIsr` request, we should check whether
+    // there are any reassignments which can be completed by a successful ISR expansion.
+    response.left.foreach { alterIsrResponses =>
+      alterIsrResponses.forKeyValue { (topicPartition, partitionResponse) =>
+        if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
+          val isSuccessfulUpdate = partitionResponse.isRight
+          if (isSuccessfulUpdate) {
+            maybeCompleteReassignment(topicPartition)
+          }
+        }
+      }
+    }
   }
 
   private def processControllerChange(): Unit = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c080d6a..9c7307b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -164,10 +164,26 @@ object HostedPartition {
   final object Offline extends HostedPartition
 }
 
+case class IsrChangePropagationConfig(
+  // How often to check for ISR
+  checkIntervalMs: Long,
+
+  // Maximum time that an ISR change may be delayed before sending the notification
+  maxDelayMs: Long,
+
+  // Maximum time to await additional changes before sending the notification
+  lingerMs: Long
+)
+
 object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
-  val IsrChangePropagationBackoff = 5000L
-  val IsrChangePropagationInterval = 60000L
+
+  // This field is mutable to allow overriding change notification behavior in test cases
+  @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = IsrChangePropagationConfig(
+    checkIntervalMs = 2500,
+    lingerMs = 5000,
+    maxDelayMs = 60000,
+  )
 }
 
 class ReplicaManager(val config: KafkaConfig,
@@ -233,9 +249,10 @@ class ReplicaManager(val config: KafkaConfig,
   this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
   private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext
= false, None)
 
+  private val isrChangeNotificationConfig = ReplicaManager.DefaultIsrPropagationConfig
   private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
-  private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
-  private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
+  private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
+  private val lastIsrPropagationMs = new AtomicLong(time.milliseconds())
 
   private var logDirFailureHandler: LogDirFailureHandler = null
 
@@ -278,7 +295,7 @@ class ReplicaManager(val config: KafkaConfig,
   def recordIsrChange(topicPartition: TopicPartition): Unit = {
     isrChangeSet synchronized {
       isrChangeSet += topicPartition
-      lastIsrChangeMs.set(System.currentTimeMillis())
+      lastIsrChangeMs.set(time.milliseconds())
     }
   }
   /**
@@ -289,11 +306,11 @@ class ReplicaManager(val config: KafkaConfig,
    * other brokers when large amount of ISR change occurs.
    */
   def maybePropagateIsrChanges(): Unit = {
-    val now = System.currentTimeMillis()
+    val now = time.milliseconds()
     isrChangeSet synchronized {
       if (isrChangeSet.nonEmpty &&
-        (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBackoff < now ||
-          lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now))
{
+        (lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
+          lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs < now))
{
         zkClient.propagateIsrChanges(isrChangeSet)
         isrChangeSet.clear()
         lastIsrPropagationMs.set(now)
@@ -324,7 +341,8 @@ class ReplicaManager(val config: KafkaConfig,
     scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs
/ 2, unit = TimeUnit.MILLISECONDS)
     // If using AlterIsr, we don't need the znode ISR propagation
     if (config.interBrokerProtocolVersion < KAFKA_2_7_IV2) {
-      scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L,
unit = TimeUnit.MILLISECONDS)
+      scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _,
+        period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
     } else {
       alterIsrManager.start()
     }
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index aeb5ac7..6d5d02d 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -21,23 +21,24 @@ import java.io.Closeable
 import java.util.{Collections, HashMap, List}
 
 import kafka.admin.ReassignPartitionsCommand._
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.api.KAFKA_2_7_IV1
+import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ReplicaManager}
+import kafka.utils.Implicits._
 import kafka.utils.TestUtils
 import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry,
DescribeLogDirsResult, NewTopic}
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
-import org.junit.rules.Timeout
+import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.rules.Timeout
 import org.junit.{After, Rule, Test}
 
-import scala.collection.Map
+import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
-import scala.collection.{Seq, mutable}
 
 class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
   @Rule
@@ -45,10 +46,6 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
 
   var cluster: ReassignPartitionsTestCluster = null
 
-  def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(5, zkConnect).map(KafkaConfig.fromProps)
-  }
-
   @After
   override def tearDown(): Unit = {
     Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster")
@@ -60,13 +57,53 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
       brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap
     }.toMap
 
-  /**
-   * Test running a quick reassignment.
-   */
+
   @Test
   def testReassignment(): Unit = {
     cluster = new ReassignPartitionsTestCluster(zkConnect)
     cluster.setup()
+    executeAndVerifyReassignment()
+  }
+
+  @Test
+  def testReassignmentWithAlterIsrDisabled(): Unit = {
+    // Test reassignment when the IBP is on an older version which does not use
+    // the `AlterIsr` API. In this case, the controller will register individual
+    // watches for each reassigning partition so that the reassignment can be
+    // completed as soon as the ISR is expanded.
+    val configOverrides = Map(KafkaConfig.InterBrokerProtocolVersionProp -> KAFKA_2_7_IV1.version)
+    cluster = new ReassignPartitionsTestCluster(zkConnect, configOverrides = configOverrides)
+    cluster.setup()
+    executeAndVerifyReassignment()
+  }
+
+  @Test
+  def testReassignmentCompletionDuringPartialUpgrade(): Unit = {
+    // Test reassignment during a partial upgrade when some brokers are relying on
+    // `AlterIsr` and some rely on the old notification logic through Zookeeper.
+    // In this test case, broker 0 starts up first on the latest IBP and is typically
+    // elected as controller. The three remaining brokers start up on the older IBP.
+    // We want to ensure that reassignment can still complete through the ISR change
+    // notification path even though the controller expects `AlterIsr`.
+
+    // Override change notification settings so that test is not delayed by ISR
+    // change notification delay
+    ReplicaManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig(
+      checkIntervalMs = 500,
+      lingerMs = 100,
+      maxDelayMs = 500
+    )
+
+    val oldIbpConfig = Map(KafkaConfig.InterBrokerProtocolVersionProp -> KAFKA_2_7_IV1.version)
+    val brokerConfigOverrides = Map(1 -> oldIbpConfig, 2 -> oldIbpConfig, 3 -> oldIbpConfig)
+
+    cluster = new ReassignPartitionsTestCluster(zkConnect, brokerConfigOverrides = brokerConfigOverrides)
+    cluster.setup()
+
+    executeAndVerifyReassignment()
+  }
+
+  def executeAndVerifyReassignment(): Unit = {
     val assignment = """{"version":1,"partitions":""" +
       """[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},"""
+
       """{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}"""
+
@@ -594,7 +631,11 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness
{
     }
   }
 
-  class ReassignPartitionsTestCluster(val zkConnect: String) extends Closeable {
+  class ReassignPartitionsTestCluster(
+    val zkConnect: String,
+    configOverrides: Map[String, String] = Map.empty,
+    brokerConfigOverrides: Map[Int, Map[String, String]] = Map.empty
+  ) extends Closeable {
     val brokers = Map(
       0 -> "rack0",
       1 -> "rack0",
@@ -622,6 +663,12 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness
{
         // Don't move partition leaders automatically.
         config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
         config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp, "1000")
+        configOverrides.forKeyValue(config.setProperty)
+
+        brokerConfigOverrides.get(brokerId).foreach { overrides =>
+          overrides.forKeyValue(config.setProperty)
+        }
+
         config
     }.toBuffer
 
@@ -637,9 +684,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
     }
 
     def createServers(): Unit = {
-      brokers.keySet.foreach {
-        case brokerId =>
-          servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId)))
+      brokers.keySet.foreach { brokerId =>
+        servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId)))
       }
     }
 


Mime
View raw message