kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.8 updated: MINOR: Add RaftReplicaManager (#10069)
Date Wed, 10 Feb 2021 22:44:04 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new f8176a3  MINOR: Add RaftReplicaManager (#10069)
f8176a3 is described below

commit f8176a31615c82946fe9835143c6fa5d27023583
Author: Ron Dagostino <rdagostino@confluent.io>
AuthorDate: Wed Feb 10 17:37:16 2021 -0500

    MINOR: Add RaftReplicaManager (#10069)
    
    This adds the logic to apply partition metadata when consuming from the Raft-based
    metadata log.
    
    RaftReplicaManager extends ReplicaManager for now to minimize changes to existing
    code for the 2.8 release. We will likely adjust this hierarchy at a later time (e.g. introducing
    a trait and adding a helper to refactor common code). For now, we expose the necessary
    fields and methods in ReplicaManager by changing their scope from private to protected,
    and we refactor out a couple of pieces of logic that are shared between the two
    implementation (stopping replicas and adding log dir fetchers).
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
---
 .../scala/kafka/server/DelayedDeleteRecords.scala  |   2 +-
 .../kafka/server/RaftReplicaChangeDelegate.scala   | 246 ++++++++++++++
 .../scala/kafka/server/RaftReplicaManager.scala    | 377 +++++++++++++++++++++
 .../main/scala/kafka/server/ReplicaManager.scala   | 198 +++++------
 .../kafka/server/metadata/MetadataImage.scala      |   7 +-
 .../unit/kafka/server/RaftReplicaManagerTest.scala | 238 +++++++++++++
 6 files changed, 961 insertions(+), 107 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 164cab5..dae17c6 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -84,7 +84,7 @@ class DelayedDeleteRecords(delayMs: Long,
                 (false, Errors.NOT_LEADER_OR_FOLLOWER, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
             }
 
-          case HostedPartition.Deferred(_) =>
+          case _: HostedPartition.Deferred =>
             (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
 
           case HostedPartition.Offline =>
diff --git a/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
new file mode 100644
index 0000000..0b7ee42
--- /dev/null
+++ b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.log.Log
+import kafka.server.checkpoints.OffsetCheckpoints
+import kafka.server.metadata.{MetadataBrokers, MetadataPartition}
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
+
+import scala.collection.{Map, Set, mutable}
+
+trait RaftReplicaChangeDelegateHelper {
+  def stateChangeLogger: StateChangeLogger
+  def replicaFetcherManager: ReplicaFetcherManager
+  def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager
+  def markDeferred(state: HostedPartition.Deferred): Unit
+  def getLogDir(topicPartition: TopicPartition): Option[String]
+  def error(msg: => String, e: => Throwable): Unit
+  def markOffline(topicPartition: TopicPartition): Unit
+  def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit
+  def isShuttingDown: Boolean
+  def initialFetchOffset(log: Log): Long
+  def config: KafkaConfig
+}
+
+class RaftReplicaChangeDelegate(helper: RaftReplicaChangeDelegateHelper) {
+  def makeDeferred(partitionsNewMap: Map[Partition, Boolean],
+                   metadataOffset: Long): Unit = {
+    val traceLoggingEnabled = helper.stateChangeLogger.isTraceEnabled
+    if (traceLoggingEnabled)
+      partitionsNewMap.forKeyValue { (partition, isNew) =>
+        helper.stateChangeLogger.trace(s"Metadata batch $metadataOffset: starting the " +
+          s"become-deferred transition for partition ${partition.topicPartition} isNew=$isNew")
+      }
+
+    // Stop fetchers for all the partitions
+    helper.replicaFetcherManager.removeFetcherForPartitions(partitionsNewMap.keySet.map(_.topicPartition))
+    helper.stateChangeLogger.info(s"Metadata batch $metadataOffset: as part of become-deferred request, " +
+      s"stopped any fetchers for ${partitionsNewMap.size} partitions")
+    // mark all the partitions as deferred
+    partitionsNewMap.forKeyValue((partition, isNew) => helper.markDeferred(HostedPartition.Deferred(partition, isNew)))
+
+    helper.replicaFetcherManager.shutdownIdleFetcherThreads()
+    helper.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+
+    if (traceLoggingEnabled)
+      partitionsNewMap.keys.foreach { partition =>
+        helper.stateChangeLogger.trace(s"Completed batch $metadataOffset become-deferred " +
+          s"transition for partition ${partition.topicPartition}")
+      }
+  }
+
+  def makeLeaders(prevPartitionsAlreadyExisting: Set[MetadataPartition],
+                  partitionStates: Map[Partition, MetadataPartition],
+                  highWatermarkCheckpoints: OffsetCheckpoints,
+                  metadataOffset: Option[Long]): Set[Partition] = {
+    val partitionsMadeLeaders = mutable.Set[Partition]()
+    val traceLoggingEnabled = helper.stateChangeLogger.isTraceEnabled
+    val deferredBatches = metadataOffset.isEmpty
+    val topLevelLogPrefix = if (deferredBatches)
+      "Metadata batch <multiple deferred>"
+    else
+      s"Metadata batch ${metadataOffset.get}"
+    try {
+      // First stop fetchers for all the partitions
+      helper.replicaFetcherManager.removeFetcherForPartitions(partitionStates.keySet.map(_.topicPartition))
+      helper.stateChangeLogger.info(s"$topLevelLogPrefix: stopped ${partitionStates.size} fetcher(s)")
+      // Update the partition information to be the leader
+      partitionStates.forKeyValue { (partition, state) =>
+        val topicPartition = partition.topicPartition
+        val partitionLogMsgPrefix = if (deferredBatches)
+          s"Apply deferred leader partition $topicPartition"
+        else
+          s"Metadata batch ${metadataOffset.get} $topicPartition"
+        try {
+          val isrState = state.toLeaderAndIsrPartitionState(
+            !prevPartitionsAlreadyExisting(state))
+          if (partition.makeLeader(isrState, highWatermarkCheckpoints)) {
+            partitionsMadeLeaders += partition
+            if (traceLoggingEnabled) {
+              helper.stateChangeLogger.trace(s"$partitionLogMsgPrefix: completed the become-leader state change.")
+            }
+          } else {
+            helper.stateChangeLogger.info(s"$partitionLogMsgPrefix: skipped the " +
+              "become-leader state change since it is already the leader.")
+          }
+        } catch {
+          case e: KafkaStorageException =>
+            helper.stateChangeLogger.error(s"$partitionLogMsgPrefix: unable to make " +
+              s"leader because the replica for the partition is offline due to disk error $e")
+            val dirOpt = helper.getLogDir(topicPartition)
+            helper.error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e)
+            helper.markOffline(topicPartition)
+        }
+      }
+    } catch {
+      case e: Throwable =>
+        helper.stateChangeLogger.error(s"$topLevelLogPrefix: error while processing batch.", e)
+        // Re-throw the exception for it to be caught in BrokerMetadataListener
+        throw e
+    }
+    partitionsMadeLeaders
+  }
+
+  def makeFollowers(prevPartitionsAlreadyExisting: Set[MetadataPartition],
+                    currentBrokers: MetadataBrokers,
+                    partitionStates: Map[Partition, MetadataPartition],
+                    highWatermarkCheckpoints: OffsetCheckpoints,
+                    metadataOffset: Option[Long]): Set[Partition] = {
+    val traceLoggingEnabled = helper.stateChangeLogger.isTraceEnabled
+    val deferredBatches = metadataOffset.isEmpty
+    val topLevelLogPrefix = if (deferredBatches)
+      "Metadata batch <multiple deferred>"
+    else
+      s"Metadata batch ${metadataOffset.get}"
+    if (traceLoggingEnabled) {
+      partitionStates.forKeyValue { (partition, state) =>
+        val topicPartition = partition.topicPartition
+        val partitionLogMsgPrefix = if (deferredBatches)
+          s"Apply deferred follower partition $topicPartition"
+        else
+          s"Metadata batch ${metadataOffset.get} $topicPartition"
+        helper.stateChangeLogger.trace(s"$partitionLogMsgPrefix: starting the " +
+          s"become-follower transition with leader ${state.leaderId}")
+      }
+    }
+
+    val partitionsMadeFollower: mutable.Set[Partition] = mutable.Set()
+    // all brokers, including both alive and not
+    val acceptableLeaderBrokerIds = currentBrokers.iterator().map(broker => broker.id).toSet
+    val allBrokersByIdMap = currentBrokers.iterator().map(broker => broker.id -> broker).toMap
+    try {
+      partitionStates.forKeyValue { (partition, state) =>
+        val topicPartition = partition.topicPartition
+        val partitionLogMsgPrefix = if (deferredBatches)
+          s"Apply deferred follower partition $topicPartition"
+        else
+          s"Metadata batch ${metadataOffset.get} $topicPartition"
+        try {
+          val isNew = !prevPartitionsAlreadyExisting(state)
+          if (!acceptableLeaderBrokerIds.contains(state.leaderId)) {
+            // The leader broker should always be present in the metadata cache.
+            // If not, we should record the error message and abort the transition process for this partition
+            helper.stateChangeLogger.error(s"$partitionLogMsgPrefix: cannot become follower " +
+              s"since the new leader ${state.leaderId} is unavailable.")
+            // Create the local replica even if the leader is unavailable. This is required to ensure that we include
+            // the partition's high watermark in the checkpoint file (see KAFKA-1647)
+            partition.createLogIfNotExists(isNew, isFutureReplica = false, highWatermarkCheckpoints)
+          } else {
+            val isrState = state.toLeaderAndIsrPartitionState(isNew)
+            if (partition.makeFollower(isrState, highWatermarkCheckpoints)) {
+              partitionsMadeFollower += partition
+              if (traceLoggingEnabled) {
+                helper.stateChangeLogger.trace(s"$partitionLogMsgPrefix: completed the " +
+                  s"become-follower state change with new leader ${state.leaderId}.")
+              }
+            } else {
+              helper.stateChangeLogger.info(s"$partitionLogMsgPrefix: skipped the " +
+                s"become-follower state change since " +
+                s"the new leader ${state.leaderId} is the same as the old leader.")
+            }
+          }
+        } catch {
+          case e: KafkaStorageException =>
+            helper.stateChangeLogger.error(s"$partitionLogMsgPrefix: unable to complete the " +
+              s"become-follower state change since the " +
+              s"replica for the partition is offline due to disk error $e")
+            val dirOpt = helper.getLogDir(partition.topicPartition)
+            helper.error(s"Error while making broker the follower with leader ${state.leaderId} in dir $dirOpt", e)
+            helper.markOffline(topicPartition)
+        }
+      }
+
+      if (partitionsMadeFollower.nonEmpty) {
+        helper.replicaFetcherManager.removeFetcherForPartitions(partitionsMadeFollower.map(_.topicPartition))
+        helper.stateChangeLogger.info(s"$topLevelLogPrefix: stopped followers for ${partitionsMadeFollower.size} partitions")
+
+        partitionsMadeFollower.foreach { partition =>
+          helper.completeDelayedFetchOrProduceRequests(partition.topicPartition)
+        }
+
+        if (helper.isShuttingDown) {
+          if (traceLoggingEnabled) {
+            partitionsMadeFollower.foreach { partition =>
+              val topicPartition = partition.topicPartition
+              val partitionLogMsgPrefix = if (deferredBatches)
+                s"Apply deferred follower partition $topicPartition"
+              else
+                s"Metadata batch ${metadataOffset.get} $topicPartition"
+              helper.stateChangeLogger.trace(s"$partitionLogMsgPrefix: skipped the " +
+                s"adding-fetcher step of the become-follower state for " +
+                s"$topicPartition since we are shutting down.")
+            }
+          }
+        } else {
+          // we do not need to check if the leader exists again since this has been done at the beginning of this process
+          val partitionsToMakeFollowerWithLeaderAndOffset = partitionsMadeFollower.map { partition =>
+            val leader = allBrokersByIdMap(partition.leaderReplicaIdOpt.get).brokerEndPoint(helper.config.interBrokerListenerName)
+            val log = partition.localLogOrException
+            val fetchOffset = helper.initialFetchOffset(log)
+            partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
+          }.toMap
+
+          helper.replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
+        }
+      }
+    } catch {
+      case e: Throwable =>
+        helper.stateChangeLogger.error(s"$topLevelLogPrefix: error while processing batch", e)
+        // Re-throw the exception for it to be caught in BrokerMetadataListener
+        throw e
+    }
+
+    if (traceLoggingEnabled)
+      partitionsMadeFollower.foreach { partition =>
+        val topicPartition = partition.topicPartition
+        val state = partitionStates(partition)
+        val partitionLogMsgPrefix = if (deferredBatches)
+          s"Apply deferred follower partition $topicPartition"
+        else
+          s"Metadata batch ${metadataOffset.get} $topicPartition"
+        helper.stateChangeLogger.trace(s"$partitionLogMsgPrefix: completed become-follower " +
+          s"transition for partition $topicPartition with new leader ${state.leaderId}")
+      }
+
+    partitionsMadeFollower
+  }
+}
diff --git a/core/src/main/scala/kafka/server/RaftReplicaManager.scala b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
new file mode 100644
index 0000000..255b349
--- /dev/null
+++ b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.LazyOffsetCheckpoints
+import kafka.server.metadata.{ConfigRepository, MetadataImage, MetadataImageBuilder, MetadataPartition, RaftMetadataCache}
+import kafka.utils.Scheduler
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+                         metrics: Metrics,
+                         time: Time,
+                         scheduler: Scheduler,
+                         logManager: LogManager,
+                         isShuttingDown: AtomicBoolean,
+                         quotaManagers: QuotaManagers,
+                         brokerTopicStats: BrokerTopicStats,
+                         metadataCache: RaftMetadataCache,
+                         logDirFailureChannel: LogDirFailureChannel,
+                         delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
+                         delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
+                         delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
+                         delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
+                         threadNamePrefix: Option[String],
+                         configRepository: ConfigRepository,
+                         alterIsrManager: AlterIsrManager) extends ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, threadNamePrefix, configRepository, alterIsrManager) {
+
+  def this(config: KafkaConfig,
+           metrics: Metrics,
+           time: Time,
+           scheduler: Scheduler,
+           logManager: LogManager,
+           isShuttingDown: AtomicBoolean,
+           quotaManagers: QuotaManagers,
+           brokerTopicStats: BrokerTopicStats,
+           metadataCache: RaftMetadataCache,
+           logDirFailureChannel: LogDirFailureChannel,
+           alterIsrManager: AlterIsrManager,
+           configRepository: ConfigRepository,
+           threadNamePrefix: Option[String] = None) = {
+    this(config, metrics, time, scheduler, logManager, isShuttingDown,
+      quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
+      DelayedOperationPurgatory[DelayedProduce](
+        purgatoryName = "Produce", brokerId = config.brokerId,
+        purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedFetch](
+        purgatoryName = "Fetch", brokerId = config.brokerId,
+        purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedDeleteRecords](
+        purgatoryName = "DeleteRecords", brokerId = config.brokerId,
+        purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedElectLeader](
+        purgatoryName = "ElectLeader", brokerId = config.brokerId),
+      threadNamePrefix, configRepository, alterIsrManager)
+  }
+
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} when using ZooKeeper")
+  }
+
+  class RaftReplicaManagerChangeDelegateHelper(raftReplicaManager: RaftReplicaManager) extends RaftReplicaChangeDelegateHelper {
+    override def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = raftReplicaManager.completeDelayedFetchOrProduceRequests(topicPartition)
+
+    override def config: KafkaConfig = raftReplicaManager.config
+
+    override def error(msg: => String, e: => Throwable): Unit = raftReplicaManager.error(msg, e)
+
+    override def getLogDir(topicPartition: TopicPartition): Option[String] = raftReplicaManager.getLogDir(topicPartition)
+
+    override def initialFetchOffset(log: Log): Long = raftReplicaManager.initialFetchOffset(log)
+
+    override def isShuttingDown: Boolean = raftReplicaManager.isShuttingDown.get
+
+    override def markDeferred(state: HostedPartition.Deferred): Unit = raftReplicaManager.markPartitionDeferred(state)
+
+    override def markOffline(topicPartition: TopicPartition): Unit = raftReplicaManager.markPartitionOffline(topicPartition)
+
+    override def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = raftReplicaManager.replicaAlterLogDirsManager
+
+    override def replicaFetcherManager: ReplicaFetcherManager = raftReplicaManager.replicaFetcherManager
+
+    override def stateChangeLogger: StateChangeLogger = raftReplicaManager.stateChangeLogger
+  }
+
+  // visible/overwriteable for testing, generally will not change otherwise
+  private[server] var delegate = new RaftReplicaChangeDelegate(new RaftReplicaManagerChangeDelegateHelper(this))
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      deferringMetadataChanges = true
+      stateChangeLogger.info(s"Metadata changes are now being deferred")
+    }
+  }
+
+  def endMetadataChangeDeferral(onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): Unit = {
+    val startMs = time.milliseconds()
+    val partitionsMadeFollower = mutable.Set[Partition]()
+    val partitionsMadeLeader = mutable.Set[Partition]()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(s"Applying deferred metadata changes")
+      val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+      val metadataImage = metadataCache.currentImage()
+      val brokers = metadataImage.brokers
+      try {
+        val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val followerPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val partition = deferredPartition.partition
+          val state = cachedState(metadataImage, partition)
+          if (state.leaderId == localBrokerId) {
+            leaderPartitionStates.put(partition, state)
+          } else {
+            followerPartitionStates.put(partition, state)
+          }
+          if (!deferredPartition.isNew) {
+            partitionsAlreadyExisting += state
+          }
+        }
+
+        val partitionsMadeLeader = if (leaderPartitionStates.nonEmpty)
+          delegate.makeLeaders(partitionsAlreadyExisting, leaderPartitionStates, highWatermarkCheckpoints, None)
+        else
+          Set.empty[Partition]
+        val partitionsMadeFollower = if (followerPartitionStates.nonEmpty)
+          delegate.makeFollowers(partitionsAlreadyExisting, brokers, followerPartitionStates, highWatermarkCheckpoints, None)
+        else
+          Set.empty[Partition]
+
+        // We need to transition anything that hasn't transitioned from Deferred to Offline to the Online state.
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val partition = deferredPartition.partition
+          allPartitions.put(partition.topicPartition, HostedPartition.Online(partition))
+        }
+
+        updateLeaderAndFollowerMetrics(partitionsMadeFollower.map(_.topic).toSet)
+
+        maybeAddLogDirFetchers(partitionsMadeFollower, highWatermarkCheckpoints)
+
+        replicaFetcherManager.shutdownIdleFetcherThreads()
+        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        if (partitionsMadeLeader.nonEmpty || partitionsMadeFollower.nonEmpty) {
+          onLeadershipChange(partitionsMadeLeader, partitionsMadeFollower)
+        }
+      } catch {
+        case e: Throwable =>
+          deferredPartitionsIterator.foreach { metadata =>
+            val partition = metadata.partition
+            val state = cachedState(metadataImage, partition)
+            val topicPartition = partition.topicPartition
+            val leader = state.leaderId == localBrokerId
+            val leaderOrFollower = if (leader) "leader" else "follower"
+            val partitionLogMsgPrefix = s"Apply deferred $leaderOrFollower partition $topicPartition"
+            stateChangeLogger.error(s"$partitionLogMsgPrefix: error while applying deferred metadata.", e)
+          }
+          stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions prior to the error: " +
+            s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)")
+          // Re-throw the exception for it to be caught in BrokerMetadataListener
+          throw e
+      }
+      deferringMetadataChanges = false
+    }
+    val endMs = time.milliseconds()
+    val elapsedMs = endMs - startMs
+    stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions: " +
+      s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)" +
+      s"in $elapsedMs ms")
+    stateChangeLogger.info("Metadata changes are no longer being deferred")
+  }
+
+  /**
+   * Handle changes made by a batch of metadata log records.
+   *
+   * @param imageBuilder       The MetadataImage builder.
+   * @param metadataOffset     The last offset in the batch of records.
+   * @param onLeadershipChange The callbacks to invoke when leadership changes.
+   */
+  def handleMetadataRecords(imageBuilder: MetadataImageBuilder,
+                            metadataOffset: Long,
+                            onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): Unit = {
+    val startMs = time.milliseconds()
+    val builder = imageBuilder.partitionsBuilder()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(("Metadata batch %d: %d local partition(s) changed, %d " +
+        "local partition(s) removed.").format(metadataOffset, builder.localChanged().size,
+        builder.localRemoved().size))
+      if (stateChangeLogger.isTraceEnabled) {
+        builder.localChanged().foreach { state =>
+          stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally changed: ${state}")
+        }
+        builder.localRemoved().foreach { state =>
+          stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally removed: ${state}")
+        }
+      }
+      if (deferringMetadataChanges) {
+        val prevPartitions = imageBuilder.prevImage.partitions
+        // partitionChangesToBeDeferred maps each partition to be deferred to whether it is new (i.e. existed before deferral began)
+        val partitionChangesToBeDeferred = mutable.HashMap[Partition, Boolean]()
+        builder.localChanged().foreach { currentState =>
+          val topicPartition = currentState.toTopicPartition
+          val (partition, priorDeferredMetadata) = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring handlePartitionChanges at $metadataOffset " +
+                s"for partition $topicPartition as the local replica for the partition is " +
+                "in an offline log directory")
+              (None, None)
+
+            case HostedPartition.Online(partition) => (Some(partition), None)
+            case deferred@HostedPartition.Deferred(partition, _) => (Some(partition), Some(deferred))
+
+            case HostedPartition.None =>
+              // Create the partition instance since it does not yet exist
+              (Some(Partition(topicPartition, time, configRepository, this)), None)
+          }
+          partition.foreach { partition =>
+            val isNew = priorDeferredMetadata match {
+              case Some(alreadyDeferred) => alreadyDeferred.isNew
+              case _ => prevPartitions.topicPartition(topicPartition.topic(), topicPartition.partition()).isEmpty
+            }
+            partitionChangesToBeDeferred.put(partition, isNew)
+          }
+        }
+
+        stateChangeLogger.info(s"Deferring metadata changes for ${partitionChangesToBeDeferred.size} partition(s)")
+        if (partitionChangesToBeDeferred.nonEmpty) {
+          delegate.makeDeferred(partitionChangesToBeDeferred, metadataOffset)
+        }
+      } else { // not deferring changes, so make leaders/followers accordingly
+        val partitionsToBeLeader = mutable.HashMap[Partition, MetadataPartition]()
+        val partitionsToBeFollower = mutable.HashMap[Partition, MetadataPartition]()
+        builder.localChanged().foreach { currentState =>
+          val topicPartition = currentState.toTopicPartition
+          val partition = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring handlePartitionChanges at $metadataOffset " +
+                s"for partition $topicPartition as the local replica for the partition is " +
+                "in an offline log directory")
+              None
+
+            case HostedPartition.Online(partition) => Some(partition)
+            case _: HostedPartition.Deferred => throw new IllegalStateException(
+              s"There should never be deferred partition metadata when we aren't deferring changes: $topicPartition")
+
+            case HostedPartition.None =>
+              // it's a partition that we don't know about yet, so create it and mark it online
+              val partition = Partition(topicPartition, time, configRepository, this)
+              allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
+              Some(partition)
+          }
+          partition.foreach { partition =>
+            if (currentState.leaderId == localBrokerId) {
+              partitionsToBeLeader.put(partition, currentState)
+            } else {
+              partitionsToBeFollower.put(partition, currentState)
+            }
+          }
+        }
+
+        val prevPartitions = imageBuilder.prevImage.partitions
+        val changedPartitionsPreviouslyExisting = mutable.Set[MetadataPartition]()
+        builder.localChanged().foreach(metadataPartition =>
+          prevPartitions.topicPartition(metadataPartition.topicName, metadataPartition.partitionIndex).foreach(
+            changedPartitionsPreviouslyExisting.add))
+        val nextBrokers = imageBuilder.brokers()
+        val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+        val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
+          delegate.makeLeaders(changedPartitionsPreviouslyExisting, partitionsToBeLeader, highWatermarkCheckpoints,
+            Some(metadataOffset))
+        else
+          Set.empty[Partition]
+        val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
+          delegate.makeFollowers(changedPartitionsPreviouslyExisting, nextBrokers, partitionsToBeFollower, highWatermarkCheckpoints,
+            Some(metadataOffset))
+        else
+          Set.empty[Partition]
+        updateLeaderAndFollowerMetrics(partitionsBecomeFollower.map(_.topic).toSet)
+
+        builder.localChanged().foreach { state =>
+          val topicPartition = state.toTopicPartition
+          /*
+          * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
+          * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
+          * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
+          * we need to map this topic-partition to OfflinePartition instead.
+          */
+          if (localLog(topicPartition).isEmpty) {
+            markPartitionOffline(topicPartition)
+          }
+        }
+
+        maybeAddLogDirFetchers(partitionsBecomeFollower, highWatermarkCheckpoints)
+
+        replicaFetcherManager.shutdownIdleFetcherThreads()
+        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
+      }
+      // TODO: we should move aside log directories which have been deleted rather than
+      // purging them from the disk immediately.
+      if (builder.localRemoved().nonEmpty) {
+        // we schedule removal immediately even if we are deferring changes
+        stopPartitions(builder.localRemoved().map(_.toTopicPartition -> true).toMap).foreach { case (topicPartition, e) =>
+          if (e.isInstanceOf[KafkaStorageException]) {
+            stateChangeLogger.error(s"Metadata batch $metadataOffset: unable to delete " +
+              s"${topicPartition} as the local replica for the partition is in an offline " +
+              "log directory")
+          } else {
+            stateChangeLogger.error(s"Metadata batch $metadataOffset: unable to delete " +
+              s"${topicPartition} due to an unexpected ${e.getClass.getName} exception: " +
+              s"${e.getMessage}")
+          }
+        }
+      }
+    }
+    val endMs = time.milliseconds()
+    val elapsedMs = endMs - startMs
+    stateChangeLogger.info(s"Metadata batch $metadataOffset: handled replica changes " +
+      s"in ${elapsedMs} ms")
+  }
+
+  def markPartitionDeferred(partition: Partition, isNew: Boolean): Unit = {
+    markPartitionDeferred(HostedPartition.Deferred(partition, isNew))
+  }
+
+  private def markPartitionDeferred(state: HostedPartition.Deferred): Unit = replicaStateChangeLock synchronized {
+    allPartitions.put(state.partition.topicPartition, state)
+  }
+
+  // An iterator over all deferred partitions. This is a weakly consistent iterator; a partition made off/online
+  // after the iterator has been constructed could still be returned by this iterator.
+  private def deferredPartitionsIterator: Iterator[HostedPartition.Deferred] = {
+    allPartitions.values.iterator.flatMap {
+      case deferred: HostedPartition.Deferred => Some(deferred)
+      case _ => None
+    }
+  }
+
+  private def cachedState(metadataImage: MetadataImage, partition: Partition): MetadataPartition = {
+    metadataImage.partitions.topicPartition(partition.topic, partition.partitionId).getOrElse(
+      throw new IllegalStateException(s"Partition has metadata changes but does not exist in the metadata cache: ${partition.topicPartition}"))
+  }
+}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e639727..3f52e22 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -163,7 +163,7 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
 sealed trait HostedPartition
 
 /**
- * Trait to represent a partition that isn't Offline -- i.e. it is either Online or it is Deferred
+ * Trait to represent a partition that isn't Offline -- i.e. it is either Online or it is Deferred.
  */
 sealed trait NonOffline extends HostedPartition {
   val partition: Partition
@@ -180,12 +180,13 @@ object HostedPartition {
   final case class Online(partition: Partition) extends NonOffline
 
   /**
-   * This broker hosted the partition but it is deferring metadata changes
-   * until it catches up on the Raft-based metadata log.
-   * This state only applies to brokers that are using a Raft-based metadata
-   * quorum; it never happens when using ZooKeeper.
+   * This broker hosted the partition (or will soon host it if it is new) but
+   * it is deferring metadata changes until it catches up on the Raft-based metadata
+   * log.  This state only applies to brokers that are using a Raft-based metadata
+   * quorum; it never happens when using ZooKeeper.  The isNew value indicates
+   * if the partition needs to be created when we apply the deferred changes.
    */
-  final case class Deferred(partition: Partition) extends NonOffline
+  final case class Deferred(partition: Partition, isNew: Boolean) extends NonOffline
 
   /**
    * This broker hosts the partition, but it is in an offline log directory.
@@ -247,20 +248,20 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /* epoch of the controller that last changed the leader */
-  @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
-  private val localBrokerId = config.brokerId
-  private val allPartitions = new Pool[TopicPartition, HostedPartition](
+  @volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch
+  protected val localBrokerId = config.brokerId
+  protected val allPartitions = new Pool[TopicPartition, HostedPartition](
     valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time, configRepository, this)))
   )
-  private val replicaStateChangeLock = new Object
+  protected val replicaStateChangeLock = new Object
   val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)
-  val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
+  private[server] val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
-  @volatile var highWatermarkCheckpoints: Map[String, OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>
+  @volatile private[server] var highWatermarkCheckpoints: Map[String, OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>
     (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
 
   this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
-  private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
+  protected val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
 
   private var logDirFailureHandler: LogDirFailureHandler = null
 
@@ -275,33 +276,6 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not
-  // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease.
-  // Changes are never deferred when using ZooKeeper.  When true, this indicates that we should transition
-  // online partitions to the deferred state if we see a metadata update for that partition.
-  private var deferringMetadataChanges: Boolean = !config.requiresZookeeper
-  stateChangeLogger.debug(s"Metadata changes deferred=$deferringMetadataChanges")
-
-  def beginMetadataChangeDeferral(): Unit = {
-    if (config.requiresZookeeper) {
-      throw new IllegalStateException("Partition metadata changes can never be deferred when using ZooKeeper")
-    }
-    replicaStateChangeLock synchronized {
-      deferringMetadataChanges = true
-      stateChangeLogger.info(s"Metadata changes are now being deferred")
-    }
-  }
-
-  def endMetadataChangeDeferral(): Unit = {
-    if (config.requiresZookeeper) {
-      throw new IllegalStateException("Partition metadata changes can never be deferred when using ZooKeeper")
-    }
-    replicaStateChangeLock synchronized {
-      deferringMetadataChanges = false
-      stateChangeLogger.info(s"Metadata changes are no longer being deferred")
-    }
-  }
-
   // Visible for testing
   private[server] val replicaSelectorOpt: Option[ReplicaSelector] = createReplicaSelector()
 
@@ -367,7 +341,7 @@ class ReplicaManager(val config: KafkaConfig,
       brokerTopicStats.removeMetrics(topic)
   }
 
-  private def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = {
+  protected def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = {
     val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
     delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
     delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
@@ -399,9 +373,9 @@ class ReplicaManager(val config: KafkaConfig,
       } else {
         this.controllerEpoch = controllerEpoch
 
-        val stoppedPartitions = mutable.Map.empty[TopicPartition, StopReplicaPartitionState]
+        val stoppedPartitions = mutable.Map.empty[TopicPartition, Boolean]
         partitionStates.forKeyValue { (topicPartition, partitionState) =>
-          val deletePartition = partitionState.deletePartition
+          val deletePartition = partitionState.deletePartition()
 
           getPartition(topicPartition) match {
             case HostedPartition.Offline =>
@@ -421,7 +395,7 @@ class ReplicaManager(val config: KafkaConfig,
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
                   requestLeaderEpoch > currentLeaderEpoch) {
-                stoppedPartitions += topicPartition -> partitionState
+                stoppedPartitions += topicPartition -> deletePartition
                 // Assume that everything will go right. It is overwritten in case of an error.
                 responseMap.put(topicPartition, Errors.NONE)
               } else if (requestLeaderEpoch < currentLeaderEpoch) {
@@ -439,71 +413,84 @@ class ReplicaManager(val config: KafkaConfig,
                 responseMap.put(topicPartition, Errors.FENCED_LEADER_EPOCH)
               }
 
-            case HostedPartition.Deferred(_) =>
+            case _: HostedPartition.Deferred =>
               throw new IllegalStateException("We should never be deferring partition metadata changes and stopping a replica when using ZooKeeper")
 
             case HostedPartition.None =>
               // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is down and recovers.
-              stoppedPartitions += topicPartition -> partitionState
+              stoppedPartitions += topicPartition -> deletePartition
               responseMap.put(topicPartition, Errors.NONE)
           }
         }
 
-        // First stop fetchers for all partitions.
-        val partitions = stoppedPartitions.keySet
-        replicaFetcherManager.removeFetcherForPartitions(partitions)
-        replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
-
-        // Second remove deleted partitions from the partition map. Fetchers rely on the
-        // ReplicaManager to get Partition's information so they must be stopped first.
-        val deletedPartitions = mutable.Set.empty[TopicPartition]
-        stoppedPartitions.forKeyValue { (topicPartition, partitionState) =>
-          if (partitionState.deletePartition) {
-            getPartition(topicPartition) match {
-              case hostedPartition@HostedPartition.Online(partition) =>
-                if (allPartitions.remove(topicPartition, hostedPartition)) {
-                  maybeRemoveTopicMetrics(topicPartition.topic)
-                  // Logs are not deleted here. They are deleted in a single batch later on.
-                  // This is done to avoid having to checkpoint for every deletions.
-                  partition.delete()
-                }
-
-              case _ =>
-            }
-
-            deletedPartitions += topicPartition
-          }
-
-          // If we were the leader, we may have some operations still waiting for completion.
-          // We force completion to prevent them from timing out.
-          completeDelayedFetchOrProduceRequests(topicPartition)
-        }
-
-        // Third delete the logs and checkpoint.
-        logManager.asyncDelete(deletedPartitions, (topicPartition, exception) => {
-          exception match {
-            case e: KafkaStorageException =>
+        stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) =>
+          if (e.isInstanceOf[KafkaStorageException]) {
               stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId " +
                 s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
                 "partition is in an offline log directory")
-              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-
-            case e =>
-              stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
+          } else {
+            stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId " +
                 s"epoch $controllerEpoch for partition $topicPartition due to an unexpected " +
                 s"${e.getClass.getName} exception: ${e.getMessage}")
               responseMap.put(topicPartition, Errors.forException(e))
           }
-        })
-
+          responseMap.put(topicPartition, Errors.forException(e))
+        }
         (responseMap, Errors.NONE)
       }
     }
   }
 
+  /**
+   * Stop the given partitions.
+   *
+   * @param partitionsToStop    A map from a topic partition to a boolean indicating
+   *                            whether the partition should be deleted.
+   *
+   * @return                    A map from partitions to exceptions which occurred.
+   *                            If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = {
+    // First stop fetchers for all partitions.
+    val partitions = partitionsToStop.keySet
+    replicaFetcherManager.removeFetcherForPartitions(partitions)
+    replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+
+    // Second remove deleted partitions from the partition map. Fetchers rely on the
+    // ReplicaManager to get Partition's information so they must be stopped first.
+    val partitionsToDelete = mutable.Set.empty[TopicPartition]
+    partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
+      if (shouldDelete) {
+        getPartition(topicPartition) match {
+          case hostedPartition: NonOffline =>
+            if (allPartitions.remove(topicPartition, hostedPartition)) {
+              maybeRemoveTopicMetrics(topicPartition.topic)
+              // Logs are not deleted here. They are deleted in a single batch later on.
+              // This is done to avoid having to checkpoint for every deletions.
+              hostedPartition.partition.delete()
+            }
+
+          case _ =>
+        }
+        partitionsToDelete += topicPartition
+      }
+      // If we were the leader, we may have some operations still waiting for completion.
+      // We force completion to prevent them from timing out.
+      completeDelayedFetchOrProduceRequests(topicPartition)
+    }
+
+    // Third delete the logs and checkpoint.
+    val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+    if (partitionsToDelete.nonEmpty) {
+      // Delete the logs and checkpoint.
+      logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e))
+    }
+    errorMap
+  }
+
   def getPartition(topicPartition: TopicPartition): HostedPartition = {
     Option(allPartitions.get(topicPartition)).getOrElse(HostedPartition.None)
   }
@@ -569,7 +556,7 @@ class ReplicaManager(val config: KafkaConfig,
         // the local replica has been deleted.
         Left(Errors.NOT_LEADER_OR_FOLLOWER)
 
-      case HostedPartition.Deferred(_) =>
+      case _: HostedPartition.Deferred =>
         // The topic exists, but this broker is deferring metadata changes for it, so we return NOT_LEADER_OR_FOLLOWER
         // which forces clients to refresh metadata.
         Left(Errors.NOT_LEADER_OR_FOLLOWER)
@@ -766,7 +753,7 @@ class ReplicaManager(val config: KafkaConfig,
             case HostedPartition.Offline =>
               throw new KafkaStorageException(s"Partition $topicPartition is offline")
 
-            case HostedPartition.Deferred(_) =>
+            case _: HostedPartition.Deferred =>
               throw new IllegalStateException(s"Partition $topicPartition is deferred")
 
             case HostedPartition.None => // Do nothing
@@ -1365,7 +1352,7 @@ class ReplicaManager(val config: KafkaConfig,
                 responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
                 None
 
-              case HostedPartition.Deferred(_) =>
+              case _: HostedPartition.Deferred =>
                 throw new IllegalStateException("We should never be deferring partition metadata changes and becoming a leader or follower when using ZooKeeper")
 
               case HostedPartition.Online(partition) =>
@@ -1426,18 +1413,8 @@ class ReplicaManager(val config: KafkaConfig,
           else
             Set.empty[Partition]
 
-          /*
-         * KAFKA-8392
-         * For topic partitions of which the broker is no longer a leader, delete metrics related to
-         * those topics. Note that this means the broker stops being either a replica or a leader of
-         * partitions of said topics
-         */
-          val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
           val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
-          followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
-
-          // remove metrics for brokers which are not followers of a topic
-          leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
+          updateLeaderAndFollowerMetrics(followerTopicSet)
 
           leaderAndIsrRequest.partitionStates.forEach { partitionState =>
             val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
@@ -1520,7 +1497,21 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def maybeAddLogDirFetchers(partitions: Set[Partition],
+  /**
+   * KAFKA-8392
+   * For topic partitions of which the broker is no longer a leader, delete metrics related to
+   * those topics. Note that this means the broker stops being either a replica or a leader of
+   * partitions of said topics
+   */
+  protected def updateLeaderAndFollowerMetrics(newFollowerTopics: Set[String]): Unit = {
+    val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
+    newFollowerTopics.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
+
+    // remove metrics for brokers which are not followers of a topic
+    leaderTopicSet.diff(newFollowerTopics).foreach(brokerTopicStats.removeOldFollowerMetrics)
+  }
+
+  protected def maybeAddLogDirFetchers(partitions: Set[Partition],
                                      offsetCheckpoints: OffsetCheckpoints): Unit = {
     val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
     for (partition <- partitions) {
@@ -1753,7 +1744,7 @@ class ReplicaManager(val config: KafkaConfig,
    * diverging epoch is returned in the response, avoiding the need for a separate
    * OffsetForLeaderEpoch request.
    */
-  private def initialFetchOffset(log: Log): Long = {
+  protected def initialFetchOffset(log: Log): Long = {
     if (ApiVersion.isTruncationOnFetchSupported(config.interBrokerProtocolVersion) && log.latestEpoch.nonEmpty)
       log.logEndOffset
     else
@@ -1841,7 +1832,6 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  // Used only by test
   def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock synchronized {
     allPartitions.put(tp, HostedPartition.Offline)
     Partition.removeMetrics(tp)
@@ -1964,7 +1954,7 @@ class ReplicaManager(val config: KafkaConfig,
               .setPartition(offsetForLeaderPartition.partition)
               .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code)
 
-          case HostedPartition.Deferred(_) =>
+          case _: HostedPartition.Deferred =>
             new EpochEndOffset()
               .setPartition(offsetForLeaderPartition.partition)
               .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code)
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataImage.scala b/core/src/main/scala/kafka/server/metadata/MetadataImage.scala
index 1993065..f4e5831 100755
--- a/core/src/main/scala/kafka/server/metadata/MetadataImage.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataImage.scala
@@ -86,12 +86,15 @@ case class MetadataImageBuilder(brokerId: Int,
     } else {
       _partitionsBuilder.build()
     }
-    val nextBrokers = if (_brokersBuilder == null) {
+    MetadataImage(nextPartitions, _controllerId, brokers())
+  }
+
+  def brokers(): MetadataBrokers = {
+    if (_brokersBuilder == null) {
       prevImage.brokers
     } else {
       _brokersBuilder.build()
     }
-    MetadataImage(nextPartitions, _controllerId, nextBrokers)
   }
 }
 
diff --git a/core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala
new file mode 100644
index 0000000..d3d6471
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.metadata.{CachedConfigRepository, MetadataBroker, MetadataBrokers, MetadataImage, MetadataImageBuilder, MetadataPartition, RaftMetadataCache}
+import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.metadata.PartitionRecord
+import org.apache.kafka.common.metrics.Metrics
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, never, verify, when}
+import org.slf4j.Logger
+
+import scala.collection.{Set, mutable}
+
+trait LeadershipChangeHandler {
+  def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]): Unit
+}
+
+class RaftReplicaManagerTest {
+  private var alterIsrManager: AlterIsrManager = _
+  private var config: KafkaConfig = _
+  private val configRepository = new CachedConfigRepository()
+  private val metrics = new Metrics
+  private var quotaManager: QuotaManagers = _
+  private val time = new MockTime
+  private var mockDelegate: RaftReplicaChangeDelegate = _
+  private var imageBuilder: MetadataImageBuilder = _
+  private val brokerId0 = 0
+  private val metadataBroker0 = new MetadataBroker(brokerId0, null, Map.empty, false)
+  private val brokerId1 = 1
+  private val metadataBroker1 = new MetadataBroker(brokerId1, null, Map.empty, false)
+  private val topicName = "topicName"
+  private val topicId = Uuid.randomUuid()
+  private val partitionId0 = 0
+  private val partitionId1 = 1
+  private val topicPartition0 = new TopicPartition(topicName, partitionId0)
+  private val topicPartition1 = new TopicPartition(topicName, partitionId1)
+  private val topicPartitionRecord0 = new PartitionRecord()
+    .setPartitionId(partitionId0)
+    .setTopicId(topicId)
+    .setReplicas(util.Arrays.asList(brokerId0, brokerId1))
+    .setLeader(brokerId0)
+    .setLeaderEpoch(0)
+  private val topicPartitionRecord1 = new PartitionRecord()
+    .setPartitionId(partitionId1)
+    .setTopicId(topicId)
+    .setReplicas(util.Arrays.asList(brokerId0, brokerId1))
+    .setLeader(brokerId1)
+    .setLeaderEpoch(0)
+  private val offset1 = 1L
+  private val metadataPartition0 = MetadataPartition(topicName, topicPartitionRecord0)
+  private val metadataPartition1 = MetadataPartition(topicName, topicPartitionRecord1)
+  private var onLeadershipChangeHandler: LeadershipChangeHandler = _
+  private var onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit = _
+  private var metadataCache: RaftMetadataCache = _
+
+  @BeforeEach
+  def setUp(): Unit = {
+    alterIsrManager = mock(classOf[AlterIsrManager])
+    config = KafkaConfig.fromProps({
+      val nodeId = brokerId0
+      val props = TestUtils.createBrokerConfig(nodeId, "")
+      props.put(KafkaConfig.ProcessRolesProp, "broker")
+      props.put(KafkaConfig.NodeIdProp, nodeId.toString)
+      props
+    })
+    metadataCache = new RaftMetadataCache(config.brokerId)
+    quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+    mockDelegate = mock(classOf[RaftReplicaChangeDelegate])
+    imageBuilder = MetadataImageBuilder(brokerId0, mock(classOf[Logger]), new MetadataImage())
+    onLeadershipChangeHandler = mock(classOf[LeadershipChangeHandler])
+    onLeadershipChange = onLeadershipChangeHandler.onLeadershipChange _
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+    TestUtils.clearYammerMetrics()
+    Option(quotaManager).foreach(_.shutdown())
+    metrics.close()
+  }
+
+  def createRaftReplicaManager(): RaftReplicaManager = {
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
+    new RaftReplicaManager(config, metrics, time, new MockScheduler(time), mockLogMgr,
+      new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
+      metadataCache, new LogDirFailureChannel(config.logDirs.size), alterIsrManager,
+      configRepository, None)
+  }
+
+  @Test
+  def testRejectsZkConfig(): Unit = {
+    assertThrows(classOf[IllegalStateException], () => {
+      val zkConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, ""))
+      val mockLogMgr = TestUtils.createLogManager(zkConfig.logDirs.map(new File(_)))
+      new RaftReplicaManager(zkConfig, metrics, time, new MockScheduler(time), mockLogMgr,
+        new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
+        metadataCache, new LogDirFailureChannel(config.logDirs.size), alterIsrManager,
+        configRepository)
+    })
+  }
+
+  @Test
+  def testDefersChangesImmediatelyThenAppliesChanges(): Unit = {
+    val rrm = createRaftReplicaManager()
+    rrm.delegate = mockDelegate
+    val partition0 =  Partition(topicPartition0, time, configRepository, rrm)
+    val partition1 =  Partition(topicPartition1, time, configRepository, rrm)
+
+    processTopicPartitionMetadata(rrm)
+    // verify changes would have been deferred
+    val partitionsNewMapCaptor: ArgumentCaptor[mutable.Map[Partition, Boolean]] =
+      ArgumentCaptor.forClass(classOf[mutable.Map[Partition, Boolean]])
+    verify(mockDelegate).makeDeferred(partitionsNewMapCaptor.capture(), ArgumentMatchers.eq(offset1))
+    val partitionsDeferredMap = partitionsNewMapCaptor.getValue
+    assertEquals(Map(partition0 -> true, partition1 -> true), partitionsDeferredMap)
+    verify(mockDelegate, never()).makeFollowers(any(), any(), any(), any(), any())
+
+    // now mark those topic partitions as being deferred so we can later apply the changes
+    rrm.markPartitionDeferred(partition0, isNew = true)
+    rrm.markPartitionDeferred(partition1, isNew = true)
+
+    // apply the changes
+    // define some return values to avoid NPE
+    when(mockDelegate.makeLeaders(any(), any(), any(), any())).thenReturn(Set(partition0))
+    when(mockDelegate.makeFollowers(any(), any(), any(), any(), any())).thenReturn(Set(partition1))
+    rrm.endMetadataChangeDeferral(onLeadershipChange)
+    // verify that the deferred changes would have been applied
+
+    // leaders...
+    val leaderPartitionStates = verifyMakeLeaders(mutable.Set(), None)
+    assertEquals(Map(partition0 -> metadataPartition0), leaderPartitionStates)
+
+    // followers...
+    val followerPartitionStates = verifyMakeFollowers(mutable.Set(), Set(brokerId0, brokerId1), None)
+    assertEquals(Map(partition1 -> metadataPartition1), followerPartitionStates)
+
+    // leadership change callbacks
+    verifyLeadershipChangeCallbacks(List(partition0), List(partition1))
+  }
+
+  @Test
+  def testAppliesChangesWhenNotDeferring(): Unit = {
+    val rrm = createRaftReplicaManager()
+    rrm.delegate = mockDelegate
+    val partition0 = Partition(topicPartition0, time, configRepository, rrm)
+    val partition1 = Partition(topicPartition1, time, configRepository, rrm)
+    rrm.endMetadataChangeDeferral(onLeadershipChange)
+
+    // define some return values to avoid NPE
+    when(mockDelegate.makeLeaders(any(), any(), any(), ArgumentMatchers.eq(Some(offset1)))).thenReturn(Set(partition0))
+    when(mockDelegate.makeFollowers(any(), any(), any(), any(), ArgumentMatchers.eq(Some(offset1)))).thenReturn(Set(partition1))
+    // process the changes
+    processTopicPartitionMetadata(rrm)
+    // verify that the changes would have been applied
+
+    // leaders...
+    val leaderPartitionStates = verifyMakeLeaders(mutable.Set(), Some(offset1))
+    assertEquals(Map(partition0 -> metadataPartition0), leaderPartitionStates)
+
+    // followers...
+    val followerPartitionStates = verifyMakeFollowers(mutable.Set(), Set(brokerId0, brokerId1), Some(offset1))
+    assertEquals(Map(partition1 -> metadataPartition1), followerPartitionStates)
+
+    // leadership change callbacks
+    verifyLeadershipChangeCallbacks(List(partition0), List(partition1))
+  }
+
+  private def verifyMakeLeaders(expectedPrevPartitionsAlreadyExisting: Set[MetadataPartition],
+                                expectedMetadataOffset: Option[Long]): mutable.Map[Partition, MetadataPartition] = {
+    val leaderPartitionStatesCaptor: ArgumentCaptor[mutable.Map[Partition, MetadataPartition]] =
+      ArgumentCaptor.forClass(classOf[mutable.Map[Partition, MetadataPartition]])
+    verify(mockDelegate).makeLeaders(ArgumentMatchers.eq(expectedPrevPartitionsAlreadyExisting),
+      leaderPartitionStatesCaptor.capture(), any(), ArgumentMatchers.eq(expectedMetadataOffset))
+    leaderPartitionStatesCaptor.getValue
+  }
+
+  private def verifyMakeFollowers(expectedPrevPartitionsAlreadyExisting: Set[MetadataPartition],
+                                  expectedBrokers: Set[Int],
+                                  expectedMetadataOffset: Option[Long]): mutable.Map[Partition, MetadataPartition] = {
+    val followerPartitionStatesCaptor: ArgumentCaptor[mutable.Map[Partition, MetadataPartition]] =
+      ArgumentCaptor.forClass(classOf[mutable.Map[Partition, MetadataPartition]])
+    val brokersCaptor: ArgumentCaptor[MetadataBrokers] = ArgumentCaptor.forClass(classOf[MetadataBrokers])
+    verify(mockDelegate).makeFollowers(ArgumentMatchers.eq(expectedPrevPartitionsAlreadyExisting), brokersCaptor.capture(),
+      followerPartitionStatesCaptor.capture(), any(), ArgumentMatchers.eq(expectedMetadataOffset))
+    val brokers = brokersCaptor.getValue
+    assertEquals(expectedBrokers.size, brokers.size())
+    expectedBrokers.foreach(brokerId => assertTrue(brokers.aliveBroker(brokerId).isDefined))
+    followerPartitionStatesCaptor.getValue
+  }
+
+  private def verifyLeadershipChangeCallbacks(expectedUpdatedLeaders: List[Partition], expectedUpdatedFollowers: List[Partition]): Unit = {
+    val updatedLeadersCaptor: ArgumentCaptor[Iterable[Partition]] = ArgumentCaptor.forClass(classOf[Iterable[Partition]])
+    val updatedFollowersCaptor: ArgumentCaptor[Iterable[Partition]] = ArgumentCaptor.forClass(classOf[Iterable[Partition]])
+    verify(onLeadershipChangeHandler).onLeadershipChange(updatedLeadersCaptor.capture(), updatedFollowersCaptor.capture())
+    assertEquals(expectedUpdatedLeaders, updatedLeadersCaptor.getValue.toList)
+    assertEquals(expectedUpdatedFollowers, updatedFollowersCaptor.getValue.toList)
+  }
+
+  private def processTopicPartitionMetadata(raftReplicaManager: RaftReplicaManager): Unit = {
+    // create brokers
+    imageBuilder.brokersBuilder().add(metadataBroker0)
+    imageBuilder.brokersBuilder().add(metadataBroker1)
+    // create topic
+    imageBuilder.partitionsBuilder().addUuidMapping(topicName, topicId)
+    // create deferred partitions
+    imageBuilder.partitionsBuilder().set(metadataPartition0)
+    imageBuilder.partitionsBuilder().set(metadataPartition1)
+    // apply the changes to metadata cache
+    metadataCache.image(imageBuilder.build())
+    // apply the changes to replica manager
+    raftReplicaManager.handleMetadataRecords(imageBuilder, offset1, onLeadershipChange)
+  }
+}


Mime
View raw message