kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-5107; remove preferred replica election state from ControllerContext
Date Tue, 02 May 2017 15:41:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 46da01a4a -> b2fcf73b6


KAFKA-5107; remove preferred replica election state from ControllerContext

KAFKA-5028 moves the controller to a single-threaded model, so we would no longer have work
interleaved between preferred replica leader election, meaning we don't need to keep its state.

This patch additionally addresses a bug from KAFKA-5028 where it made onPreferredReplicaElection
keep the line calling topicDeletionManager.markTopicIneligibleForDeletion but removes the
line calling topicDeletionManager.resumeDeletionForTopics

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #2927 from onurkaraman/KAFKA-5107


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b2fcf73b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b2fcf73b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b2fcf73b

Branch: refs/heads/trunk
Commit: b2fcf73b68d7ad3b0b4fb90371298e7163c630c6
Parents: 46da01a
Author: Onur Karaman <okaraman@linkedin.com>
Authored: Tue May 2 08:41:19 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue May 2 08:41:19 2017 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 60 +++++++-------------
 .../kafka/controller/TopicDeletionManager.scala | 39 ++++++-------
 2 files changed, 42 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b2fcf73b/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e7e190f..be4dd53 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -50,7 +50,6 @@ class ControllerContext(val zkUtils: ZkUtils) {
   var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
   var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch]
= mutable.Map.empty
   val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext]
= new mutable.HashMap
-  val partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new
mutable.HashSet
 
   private var liveBrokersUnderlying: Set[Broker] = Set.empty
   private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
@@ -154,7 +153,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   // have a separate scheduler for the controller to be able to start and stop independently
of the
   // kafka server
   private val kafkaScheduler = new KafkaScheduler(1)
-  var topicDeletionManager: TopicDeletionManager = null
+  val topicDeletionManager: TopicDeletionManager = new TopicDeletionManager(this)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@ -247,6 +246,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     registerBrokerChangeListener()
 
     initializeControllerContext()
+    val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
+    topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
 
     // We need to send UpdateMetadataRequest after the controller context is initialized
and before the state machines
     // are started. The is because brokers need to receive the list of live brokers from
UpdateMetadataRequest before
@@ -261,13 +262,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
     controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic))
     info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId,
epoch))
     maybeTriggerPartitionReassignment()
-    maybeTriggerPreferredReplicaElection()
+    topicDeletionManager.tryTopicDeletion()
+    val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
+    onPreferredReplicaElection(pendingPreferredReplicaElections)
     info("starting the controller scheduler")
     kafkaScheduler.startup()
     if (config.autoLeaderRebalanceEnable) {
       scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
     }
-    topicDeletionManager.start()
   }
 
   private def scheduleAutoLeaderRebalanceTask(delay: Long, unit: TimeUnit): Unit = {
@@ -287,9 +289,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     deregisterPartitionReassignmentListener()
     deregisterPreferredReplicaElectionListener()
 
-    // shutdown delete topic manager
-    if (topicDeletionManager != null)
-      topicDeletionManager.shutdown()
+    // reset topic deletion manager
+    topicDeletionManager.reset()
 
     // shutdown leader rebalance scheduler
     kafkaScheduler.shutdown()
@@ -573,8 +574,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance:
Boolean = false) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
-      controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
-      topicDeletionManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
       partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for
partitions %s".format(partitions.mkString(",")), e)
@@ -658,16 +657,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
     updateLeaderAndIsrCache()
     // start the channel manager
     startChannelManager()
-    initializePreferredReplicaElection()
     initializePartitionReassignment()
-    initializeTopicDeletion()
     info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
     info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
     info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
   }
 
-  private def initializePreferredReplicaElection() {
-    // initialize preferred replica election state
+  private def fetchPendingPreferredReplicaElections(): Set[TopicAndPartition] = {
     val partitionsUndergoingPreferredReplicaElection = zkUtils.getPartitionsUndergoingPreferredReplicaElection()
     // check if they are already completed or topic was deleted
     val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter
{ partition =>
@@ -677,11 +673,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
         if(!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
== replicasOpt.get.head else false
       successful || topicDeleted
     }
-    controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
-    controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
+    val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection
-- partitionsThatCompletedPreferredReplicaElection
+    val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition
=> topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
+    val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion
-- pendingPreferredReplicaElectionsSkippedFromTopicDeletion
     info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
     info("Partitions that completed preferred replica election: %s".format(partitionsThatCompletedPreferredReplicaElection.mkString(",")))
-    info("Resuming preferred replica election for partitions: %s".format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+    info("Skipping preferred replica election for partitions due to topic deletion: %s".format(pendingPreferredReplicaElectionsSkippedFromTopicDeletion.mkString(",")))
+    info("Resuming preferred replica election for partitions: %s".format(pendingPreferredReplicaElections.mkString(",")))
+    pendingPreferredReplicaElections
   }
 
   private def initializePartitionReassignment() {
@@ -704,18 +703,15 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
     info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString()))
   }
 
-  private def initializeTopicDeletion() {
-    val topicsQueuedForDeletion = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
+  private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
+    val topicsToBeDeleted = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
     val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter
{ case (_, replicas) =>
       replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
-    val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
     val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
-    val topicsIneligibleForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress
|
-                                  topicsForWhichPreferredReplicaElectionIsInProgress
-    info("List of topics to be deleted: %s".format(topicsQueuedForDeletion.mkString(",")))
+    val topicsIneligibleForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress
+    info("List of topics to be deleted: %s".format(topicsToBeDeleted.mkString(",")))
     info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(",")))
-    // initialize the topic deletion manager
-    topicDeletionManager = new TopicDeletionManager(this, topicsQueuedForDeletion, topicsIneligibleForDeletion)
+    (topicsToBeDeleted, topicsIneligibleForDeletion)
   }
 
   private def maybeTriggerPartitionReassignment() {
@@ -724,10 +720,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
     }
   }
 
-  private def maybeTriggerPreferredReplicaElection() {
-    onPreferredReplicaElection(controllerContext.partitionsUndergoingPreferredReplicaElection.toSet)
-  }
-
   private def startChannelManager() {
     controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext,
config, time, metrics, threadNamePrefix)
     controllerContext.controllerChannelManager.startup()
@@ -960,7 +952,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     }
     if (!isTriggeredByAutoRebalance)
       zkUtils.deletePath(ZkUtils.PreferredReplicaLeaderElectionPath)
-    controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
   }
 
   /**
@@ -1128,7 +1119,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
             // and preferred replica election is not in progress
             if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
                 controllerContext.partitionsBeingReassigned.isEmpty &&
-                controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
                 !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                 controllerContext.allTopics.contains(topicPartition.topic)) {
               onPreferredReplicaElection(Set(topicPartition), true)
@@ -1252,11 +1242,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
           info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
           // mark topic ineligible for deletion if other state changes are in progress
           topicsToBeDeleted.foreach { topic =>
-            val preferredReplicaElectionInProgress =
-              controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
             val partitionReassignmentInProgress =
               controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
-            if (preferredReplicaElectionInProgress || partitionReassignmentInProgress)
+            if (partitionReassignmentInProgress)
               topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
           }
           // add topic to deletion list
@@ -1376,13 +1364,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
 
   }
 
-  case class PreferredReplicaLeaderElection(partitionsForPreferredReplicaElection: Set[TopicAndPartition])
extends ControllerEvent {
+  case class PreferredReplicaLeaderElection(partitions: Set[TopicAndPartition]) extends ControllerEvent
{
     override def process(): Unit = {
       if (!isActive) return
-      if (controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty)
-        info("These partitions are already undergoing preferred replica election: %s"
-          .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
-      val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
       val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
       if (partitionsForTopicsToBeDeleted.nonEmpty) {
         error("Skipping preferred replica election for partitions %s since the respective
topics are being deleted"

http://git-wip-us.apache.org/repos/asf/kafka/blob/b2fcf73b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index bc1a03e..b1f98b5 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -56,32 +56,34 @@ import scala.collection.{Set, mutable}
  *    if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed
state, then
  *    it marks the topic for deletion retry.
  * @param controller
- * @param initialTopicsToBeDeleted The topics that are queued up for deletion in zookeeper
at the time of controller failover
- * @param initialTopicsIneligibleForDeletion The topics ineligible for deletion due to any
of the conditions mentioned in #3 above
  */
-class TopicDeletionManager(controller: KafkaController, initialTopicsToBeDeleted: Set[String]
= Set.empty, initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging
{
+class TopicDeletionManager(controller: KafkaController) extends Logging {
   this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
   val controllerContext = controller.controllerContext
   val partitionStateMachine = controller.partitionStateMachine
   val replicaStateMachine = controller.replicaStateMachine
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
-  val topicsToBeDeleted: mutable.Set[String] = if (isDeleteTopicEnabled) {
-    mutable.Set.empty[String] ++ initialTopicsToBeDeleted
-  } else {
-    // if delete topic is disabled clean the topic entries under /admin/delete_topics
-    val zkUtils = controllerContext.zkUtils
-    for (topic <- initialTopicsToBeDeleted) {
-      val deleteTopicPath = getDeleteTopicPath(topic)
-      info("Removing " + deleteTopicPath + " since delete topic is disabled")
-      zkUtils.zkClient.delete(deleteTopicPath)
+  val topicsToBeDeleted = mutable.Set.empty[String]
+  val partitionsToBeDeleted = mutable.Set.empty[TopicAndPartition]
+  val topicsIneligibleForDeletion = mutable.Set.empty[String]
+
+  def init(initialTopicsToBeDeleted: Set[String], initialTopicsIneligibleForDeletion: Set[String]):
Unit = {
+    if (isDeleteTopicEnabled) {
+      topicsToBeDeleted ++= initialTopicsToBeDeleted
+      partitionsToBeDeleted ++= topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
+      topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion & topicsToBeDeleted
+    } else {
+      // if delete topic is disabled clean the topic entries under /admin/delete_topics
+      val zkUtils = controllerContext.zkUtils
+      for (topic <- initialTopicsToBeDeleted) {
+        val deleteTopicPath = getDeleteTopicPath(topic)
+        info("Removing " + deleteTopicPath + " since delete topic is disabled")
+        zkUtils.zkClient.delete(deleteTopicPath)
+      }
     }
-    mutable.Set.empty[String]
   }
-  val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
-    (initialTopicsIneligibleForDeletion & topicsToBeDeleted)
-  val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
 
-  def start() {
+  def tryTopicDeletion(): Unit = {
     if (isDeleteTopicEnabled) {
       resumeDeletions()
     }
@@ -90,8 +92,7 @@ class TopicDeletionManager(controller: KafkaController, initialTopicsToBeDeleted
   /**
    * Invoked when the current controller resigns. At this time, all state for topic deletion
should be cleared.
    */
-  def shutdown() {
-    // Only allow one shutdown to go through
+  def reset() {
     if (isDeleteTopicEnabled) {
       topicsToBeDeleted.clear()
       partitionsToBeDeleted.clear()


Mime
View raw message