kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.8 updated: MINOR: Fix Raft broker restart issue when offset partitions are deferred #10155
Date Fri, 19 Feb 2021 19:16:59 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new e39bb7a  MINOR: Fix Raft broker restart issue when offset partitions are deferred
#10155
e39bb7a is described below

commit e39bb7a31cb65a59cd0b1263f382b072a15fa955
Author: Ron Dagostino <rdagostino@confluent.io>
AuthorDate: Thu Feb 18 14:57:29 2021 -0500

    MINOR: Fix Raft broker restart issue when offset partitions are deferred #10155
    
    A Raft-based broker is unable to restart if the broker defers partition
    metadata changes for a __consumer_offsets topic-partition. The issue is
    that GroupMetadataManager is asked to removeGroupsForPartition() upon
    the broker becoming a follower, but in order for that code to function
    it requires that the manager's scheduler be started. There are multiple
    possible solutions here since removeGroupsForPartition() is a no-op at
    this point in the broker startup cycle (nothing has been loaded, so
    there is nothing to unload). We could just not invoke the callback. But
    it seems more reasonable to not special-case this and instead start
    ReplicaManager and the coordinators just before applying the deferred
    partitions states.
    
    We also mark deferred partitions for which we are a follower as being
    online a bit earlier to avoid NotLeaderOrFollowerException that was
    being thrown upon restart. Fixing this issue exposed the above issue
    regarding the scheduler not being started.
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
---
 core/src/main/scala/kafka/server/BrokerServer.scala              | 6 ++++--
 core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala | 4 ++++
 core/src/main/scala/kafka/server/RaftReplicaManager.scala        | 2 ++
 3 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 19d65ab..9aae5e3 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -349,14 +349,16 @@ class BrokerServer(
       // Start log manager, which will perform (potentially lengthy) recovery-from-unclean-shutdown
if required.
       logManager.startup(metadataCache.getAllTopics())
       // Start other services that we've delayed starting, in the appropriate order.
-      replicaManager.endMetadataChangeDeferral(
-        RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator,
_, _))
       replicaManager.startup()
       replicaManager.startHighWatermarkCheckPointThread()
       groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME).
         getOrElse(config.offsetsTopicPartitions))
       transactionCoordinator.startup(() => metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME).
         getOrElse(config.transactionTopicPartitions))
+      // Apply deferred partition metadata changes after starting replica manager and coordinators
+      // so that those services are ready and able to process the changes.
+      replicaManager.endMetadataChangeDeferral(
+        RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator,
_, _))
 
       socketServer.startProcessingRequests(authorizerFutures)
 
diff --git a/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
index 0b7ee42..1bf21e0 100644
--- a/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
+++ b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
@@ -35,6 +35,7 @@ trait RaftReplicaChangeDelegateHelper {
   def getLogDir(topicPartition: TopicPartition): Option[String]
   def error(msg: => String, e: => Throwable): Unit
   def markOffline(topicPartition: TopicPartition): Unit
+  def markOnline(partition: Partition): Unit
   def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit
   def isShuttingDown: Boolean
   def initialFetchOffset(log: Log): Long
@@ -216,6 +217,9 @@ class RaftReplicaChangeDelegate(helper: RaftReplicaChangeDelegateHelper)
{
             val leader = allBrokersByIdMap(partition.leaderReplicaIdOpt.get).brokerEndPoint(helper.config.interBrokerListenerName)
             val log = partition.localLogOrException
             val fetchOffset = helper.initialFetchOffset(log)
+            if (deferredBatches) {
+              helper.markOnline(partition)
+            }
             partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch,
fetchOffset)
           }.toMap
 
diff --git a/core/src/main/scala/kafka/server/RaftReplicaManager.scala b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
index 255b349..143709d 100644
--- a/core/src/main/scala/kafka/server/RaftReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
@@ -104,6 +104,8 @@ class RaftReplicaManager(config: KafkaConfig,
 
     override def markOffline(topicPartition: TopicPartition): Unit = raftReplicaManager.markPartitionOffline(topicPartition)
 
+    override def markOnline(partition: Partition): Unit = raftReplicaManager.allPartitions.put(partition.topicPartition,
HostedPartition.Online(partition))
+
     override def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = raftReplicaManager.replicaAlterLogDirsManager
 
     override def replicaFetcherManager: ReplicaFetcherManager = raftReplicaManager.replicaFetcherManager


Mime
View raw message