kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.8 updated: MINOR: Introduce the KIP-500 Broker lifecycle manager (#10095)
Date Thu, 11 Feb 2021 16:40:34 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new ec3fd95  MINOR: Introduce the KIP-500 Broker lifecycle manager (#10095)
ec3fd95 is described below

commit ec3fd95edae22bb7639c71dadfebbb3461891cb1
Author: Colin Patrick McCabe <cmccabe@confluent.io>
AuthorDate: Thu Feb 11 08:32:38 2021 -0800

    MINOR: Introduce the KIP-500 Broker lifecycle manager (#10095)
    
    Add the KIP-500 broker lifecycle manager.  It owns the broker state.  Its inputs are
    messages passed in from other parts of the broker and from the controller: requests to
start
    up, or shut down, for example. Its output are the broker state and various futures that
can
    be used to wait for broker state transitions to occur.
    
    The lifecycle manager handles registering the broker with the controller, as described
in
    KIP-631. After registration is complete, it handles sending periodic broker heartbeats
and
    processing the responses.
    
    Reviewers: David Arthur <mumrah@gmail.com>, Ismael Juma <ismael@juma.me.uk>,
Ron Dagostino <rdagostino@confluent.io>
---
 .../kafka/server/BrokerLifecycleManager.scala      | 481 +++++++++++++++++++++
 core/src/main/scala/kafka/server/KafkaConfig.scala |  15 +
 .../kafka/server/BrokerLifecycleManagerTest.scala  | 217 ++++++++++
 3 files changed, 713 insertions(+)

diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
new file mode 100644
index 0000000..64c9200
--- /dev/null
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -0,0 +1,481 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
+import java.util.concurrent.CompletableFuture
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection
+import org.apache.kafka.common.message.{BrokerHeartbeatRequestData, BrokerRegistrationRequestData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{BrokerHeartbeatRequest, BrokerHeartbeatResponse,
BrokerRegistrationRequest, BrokerRegistrationResponse}
+import org.apache.kafka.metadata.{BrokerState, VersionRange}
+import org.apache.kafka.queue.EventQueue.DeadlineFunction
+import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import scala.jdk.CollectionConverters._
+
+
+/**
+ * The broker lifecycle manager owns the broker state.
+ *
+ * Its inputs are messages passed in from other parts of the broker and from the
+ * controller: requests to start up, or shut down, for example. Its output are the broker
+ * state and various futures that can be used to wait for broker state transitions to
+ * occur.
+ *
+ * The lifecycle manager handles registering the broker with the controller, as described
+ * in KIP-631. After registration is complete, it handles sending periodic broker
+ * heartbeats and processing the responses.
+ *
+ * This code uses an event queue paradigm. Modifications get translated into events, which
+ * are placed on the queue to be processed sequentially. As described in the JavaDoc for
+ * each variable, most mutable state can be accessed only from that event queue thread.
+ * In some cases we expose a volatile variable which can be read from any thread, but only
+ * written from the event queue thread.
+ */
+class BrokerLifecycleManager(val config: KafkaConfig,
+                             val time: Time,
+                             val threadNamePrefix: Option[String]) extends Logging {
+  val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ")
+
+  this.logIdent = logContext.logPrefix()
+
+  /**
+   * The broker id.
+   */
+  private val nodeId = config.nodeId
+
+  /**
+   * The broker rack, or null if there is no configured rack.
+   */
+  private val rack = config.rack
+
+  /**
+   * How long to wait for registration to succeed before failing the startup process.
+   */
+  private val initialTimeoutNs =
+    MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs.longValue())
+
+  /**
+   * The exponential backoff to use for resending communication.
+   */
+  private val resendExponentialBackoff =
+    new ExponentialBackoff(100, 2, config.brokerSessionTimeoutMs.toLong, 0.02)
+
+  /**
+   * The number of times we've tried and failed to communicate.  This variable can only be
+   * read or written from the event queue thread.
+   */
+  private var failedAttempts = 0L
+
+  /**
+   * The broker incarnation ID.  This ID uniquely identifies each time we start the broker
+   */
+  val incarnationId = Uuid.randomUuid()
+
+  /**
+   * A future which is completed just as soon as the broker has caught up with the latest
+   * metadata offset for the first time.
+   */
+  val initialCatchUpFuture = new CompletableFuture[Void]()
+
+  /**
+   * A future which is completed when controlled shutdown is done.
+   */
+  val controlledShutdownFuture = new CompletableFuture[Void]()
+
+  /**
+   * The broker epoch, or -1 if the broker has not yet registered.  This variable can only
+   * be written from the event queue thread.
+   */
+  @volatile private var _brokerEpoch = -1L
+
+  /**
+   * The current broker state.  This variable can only be written from the event queue
+   * thread.
+   */
+  @volatile private var _state = BrokerState.NOT_RUNNING
+
+  /**
+   * A thread-safe callback function which gives this manager the current highest metadata
+   * offset.  This variable can only be read or written from the event queue thread.
+   */
+  private var _highestMetadataOffsetProvider: () => Long = _
+
+  /**
+   * True only if we are ready to unfence the broker.  This variable can only be read or
+   * written from the event queue thread.
+   */
+  private var readyToUnfence = false
+
+  /**
+   * True if we sent a event queue to the active controller requesting controlled
+   * shutdown.  This variable can only be read or written from the event queue thread.
+   */
+  private var gotControlledShutdownResponse = false
+
+  /**
+   * Whether or not this broker is registered with the controller quorum.
+   * This variable can only be read or written from the event queue thread.
+   */
+  private var registered = false
+
+  /**
+   * True if the initial registration succeeded.  This variable can only be read or
+   * written from the event queue thread.
+   */
+  private var initialRegistrationSucceeded = false
+
+  /**
+   * The cluster ID, or null if this manager has not been started yet.  This variable can
+   * only be read or written from the event queue thread.
+   */
+  private var _clusterId: Uuid = _
+
+  /**
+   * The listeners which this broker advertises.  This variable can only be read or
+   * written from the event queue thread.
+   */
+  private var _advertisedListeners: ListenerCollection = _
+
+  /**
+   * The features supported by this broker.  This variable can only be read or written
+   * from the event queue thread.
+   */
+  private var _supportedFeatures: util.Map[String, VersionRange] = _
+
+  /**
+   * The channel manager, or null if this manager has not been started yet.  This variable
+   * can only be read or written from the event queue thread.
+   */
+  var _channelManager: BrokerToControllerChannelManager = _
+
+  /**
+   * The event queue.
+   */
+  val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
+
+  /**
+   * Start the BrokerLifecycleManager.
+   *
+   * @param highestMetadataOffsetProvider Provides the current highest metadata offset.
+   * @param channelManager                The brokerToControllerChannelManager to use.
+   * @param clusterId                     The cluster ID.
+   */
+  def start(highestMetadataOffsetProvider: () => Long,
+            channelManager: BrokerToControllerChannelManager,
+            clusterId: Uuid,
+            advertisedListeners: ListenerCollection,
+            supportedFeatures: util.Map[String, VersionRange]): Unit = {
+    eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
+      channelManager, clusterId, advertisedListeners, supportedFeatures))
+  }
+
+  def setReadyToUnfence(): Unit = {
+    eventQueue.append(new SetReadyToUnfenceEvent())
+  }
+
+  def brokerEpoch(): Long = _brokerEpoch
+
+  def state(): BrokerState = _state
+
+  private class BeginControlledShutdownEvent extends EventQueue.Event {
+    override def run(): Unit = {
+      _state match {
+        case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
+          info("Attempted to enter pending controlled shutdown state, but we are " +
+          "already in that state.")
+        case BrokerState.RUNNING =>
+          info("Beginning controlled shutdown.")
+          _state = BrokerState.PENDING_CONTROLLED_SHUTDOWN
+        case _ =>
+          info(s"Skipping controlled shutdown because we are in state ${_state}.")
+          beginShutdown()
+      }
+    }
+  }
+
+  /**
+   * Enter the controlled shutdown state if we are in RUNNING state.
+   * Or, if we're not running, shut down immediately.
+   */
+  def beginControlledShutdown(): Unit = {
+    eventQueue.append(new BeginControlledShutdownEvent())
+  }
+
+  /**
+   * Start shutting down the BrokerLifecycleManager, but do not block.
+   */
+  def beginShutdown(): Unit = {
+    eventQueue.beginShutdown("beginShutdown", new ShutdownEvent())
+  }
+
+  /**
+   * Shut down the BrokerLifecycleManager and block until all threads are joined.
+   */
+  def close(): Unit = {
+    beginShutdown()
+    eventQueue.close()
+  }
+
+  private class SetReadyToUnfenceEvent() extends EventQueue.Event {
+    override def run(): Unit = {
+      readyToUnfence = true
+      scheduleNextCommunicationImmediately()
+    }
+  }
+
+  private class StartupEvent(highestMetadataOffsetProvider: () => Long,
+                     channelManager: BrokerToControllerChannelManager,
+                     clusterId: Uuid,
+                     advertisedListeners: ListenerCollection,
+                     supportedFeatures: util.Map[String, VersionRange]) extends EventQueue.Event
{
+    override def run(): Unit = {
+      _highestMetadataOffsetProvider = highestMetadataOffsetProvider
+      _channelManager = channelManager
+      _channelManager.start()
+      _state = BrokerState.STARTING
+      _clusterId = clusterId
+      _advertisedListeners = advertisedListeners.duplicate()
+      _supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures)
+      eventQueue.scheduleDeferred("initialRegistrationTimeout",
+        new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
+        new RegistrationTimeoutEvent())
+      sendBrokerRegistration()
+      info(s"Incarnation ${incarnationId} of broker ${nodeId} in cluster ${clusterId} " +
+        "is now STARTING.")
+    }
+  }
+
+  private def sendBrokerRegistration(): Unit = {
+    val features = new BrokerRegistrationRequestData.FeatureCollection()
+    _supportedFeatures.asScala.foreach {
+      case (name, range) => features.add(new BrokerRegistrationRequestData.Feature().
+        setName(name).
+        setMinSupportedVersion(range.min()).
+        setMaxSupportedVersion(range.max()))
+    }
+    val data = new BrokerRegistrationRequestData().
+        setBrokerId(nodeId).
+        setClusterId(_clusterId).
+        setFeatures(features).
+        setIncarnationId(incarnationId).
+        setListeners(_advertisedListeners).
+        setRack(rack.orNull)
+    if (isTraceEnabled) {
+      trace(s"Sending broker registration ${data}")
+    }
+    _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
+      new BrokerRegistrationResponseHandler())
+  }
+
+  private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler
{
+    override def onComplete(response: ClientResponse): Unit = {
+      if (response.authenticationException() != null) {
+        error(s"Unable to register broker ${nodeId} because of an authentication exception.",
+          response.authenticationException());
+        scheduleNextCommunicationAfterFailure()
+      } else if (response.versionMismatch() != null) {
+        error(s"Unable to register broker ${nodeId} because of an API version problem.",
+          response.versionMismatch());
+        scheduleNextCommunicationAfterFailure()
+      } else if (response.responseBody() == null) {
+        warn(s"Unable to register broker ${nodeId}.")
+        scheduleNextCommunicationAfterFailure()
+      } else if (!response.responseBody().isInstanceOf[BrokerRegistrationResponse]) {
+        error(s"Unable to register broker ${nodeId} because the controller returned an "
+
+          "invalid response type.")
+        scheduleNextCommunicationAfterFailure()
+      } else {
+        val message = response.responseBody().asInstanceOf[BrokerRegistrationResponse]
+        val errorCode = Errors.forCode(message.data().errorCode())
+        if (errorCode == Errors.NONE) {
+          failedAttempts = 0
+          _brokerEpoch = message.data().brokerEpoch()
+          registered = true
+          initialRegistrationSucceeded = true
+          info(s"Successfully registered broker ${nodeId} with broker epoch ${_brokerEpoch}")
+          scheduleNextCommunicationImmediately() // Immediately send a heartbeat
+        } else {
+          info(s"Unable to register broker ${nodeId} because the controller returned " +
+            s"error ${errorCode}")
+          scheduleNextCommunicationAfterFailure()
+        }
+      }
+    }
+
+    override def onTimeout(): Unit = {
+      info(s"Unable to register the broker because the RPC got timed out before it could
be sent.")
+      scheduleNextCommunicationAfterFailure()
+    }
+  }
+
+  private def sendBrokerHeartbeat(): Unit = {
+    val metadataOffset = _highestMetadataOffsetProvider()
+    val data = new BrokerHeartbeatRequestData().
+      setBrokerEpoch(_brokerEpoch).
+      setBrokerId(nodeId).
+      setCurrentMetadataOffset(metadataOffset).
+      setWantFence(!readyToUnfence).
+      setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
+    if (isTraceEnabled) {
+      trace(s"Sending broker heartbeat ${data}")
+    }
+    _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data),
+      new BrokerHeartbeatResponseHandler())
+  }
+
+  private class BrokerHeartbeatResponseHandler extends ControllerRequestCompletionHandler
{
+    override def onComplete(response: ClientResponse): Unit = {
+      if (response.authenticationException() != null) {
+        error(s"Unable to send broker heartbeat for ${nodeId} because of an " +
+          "authentication exception.", response.authenticationException());
+        scheduleNextCommunicationAfterFailure()
+      } else if (response.versionMismatch() != null) {
+        error(s"Unable to send broker heartbeat for ${nodeId} because of an API " +
+          "version problem.", response.versionMismatch());
+        scheduleNextCommunicationAfterFailure()
+      } else if (response.responseBody() == null) {
+        warn(s"Unable to send broker heartbeat for ${nodeId}. Retrying.")
+        scheduleNextCommunicationAfterFailure()
+      } else if (!response.responseBody().isInstanceOf[BrokerHeartbeatResponse]) {
+        error(s"Unable to send broker heartbeat for ${nodeId} because the controller " +
+          "returned an invalid response type.")
+        scheduleNextCommunicationAfterFailure()
+      } else {
+        val message = response.responseBody().asInstanceOf[BrokerHeartbeatResponse]
+        val errorCode = Errors.forCode(message.data().errorCode())
+        if (errorCode == Errors.NONE) {
+          failedAttempts = 0
+          _state match {
+            case BrokerState.STARTING =>
+              if (message.data().isCaughtUp()) {
+                info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.")
+                _state = BrokerState.RECOVERY
+                initialCatchUpFuture.complete(null)
+              } else {
+                info(s"The broker is STARTING. Still waiting to catch up with cluster metadata.")
+              }
+              // Schedule the heartbeat after only 10 ms so that in the case where
+              //there is no recovery work to be done, we start up a bit quicker.
+              scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
+            case BrokerState.RECOVERY =>
+              if (!message.data().isFenced()) {
+                info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.")
+                _state = BrokerState.RUNNING
+              } else {
+                info(s"The broker is in RECOVERY.")
+              }
+              scheduleNextCommunicationAfterSuccess()
+            case BrokerState.RUNNING =>
+              debug(s"The broker is RUNNING. Processing heartbeat response.")
+              scheduleNextCommunicationAfterSuccess()
+            case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
+              if (!message.data().shouldShutDown()) {
+                info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still waiting
" +
+                  "for the active controller.")
+                if (!gotControlledShutdownResponse) {
+                  // If this is the first pending controlled shutdown response we got,
+                  // schedule our next heartbeat a little bit sooner than we usually would.
+                  // In the case where controlled shutdown completes quickly, this will
+                  // speed things up a little bit.
+                  scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS))
+                } else {
+                  scheduleNextCommunicationAfterSuccess()
+                }
+              } else {
+                info(s"The controlled has asked us to exit controlled shutdown.")
+                beginShutdown()
+              }
+              gotControlledShutdownResponse = true
+            case BrokerState.SHUTTING_DOWN =>
+              info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat response.")
+            case _ =>
+              error(s"Unexpected broker state ${_state}")
+              scheduleNextCommunicationAfterSuccess()
+          }
+        } else {
+          warn(s"Broker ${nodeId} sent a heartbeat request but received error ${errorCode}.")
+          scheduleNextCommunicationAfterFailure()
+        }
+      }
+    }
+
+    override def onTimeout(): Unit = {
+      info("Unable to send a heartbeat because the RPC got timed out before it could be sent.")
+      scheduleNextCommunicationAfterFailure()
+    }
+  }
+
+  private def scheduleNextCommunicationImmediately(): Unit = scheduleNextCommunication(0)
+
+  private def scheduleNextCommunicationAfterFailure(): Unit = {
+    val delayMs = resendExponentialBackoff.backoff(failedAttempts)
+    failedAttempts = failedAttempts + 1
+    scheduleNextCommunication(NANOSECONDS.convert(delayMs, MILLISECONDS))
+  }
+
+  private def scheduleNextCommunicationAfterSuccess(): Unit = {
+    scheduleNextCommunication(NANOSECONDS.convert(
+      config.brokerHeartbeatIntervalMs.longValue() , MILLISECONDS))
+  }
+
+  private def scheduleNextCommunication(intervalNs: Long): Unit = {
+    trace(s"Scheduling next communication at ${MILLISECONDS.convert(intervalNs, NANOSECONDS)}
" +
+      "ms from now.")
+    val deadlineNs = time.nanoseconds() + intervalNs
+    eventQueue.scheduleDeferred("communication",
+      new DeadlineFunction(deadlineNs),
+      new CommunicationEvent())
+  }
+
+  private class RegistrationTimeoutEvent extends EventQueue.Event {
+    override def run(): Unit = {
+      if (!initialRegistrationSucceeded) {
+        error("Shutting down because we were unable to register with the controller quorum.")
+        eventQueue.beginShutdown("registrationTimeout", new ShutdownEvent())
+      }
+    }
+  }
+
+  private class CommunicationEvent extends EventQueue.Event {
+    override def run(): Unit = {
+      if (registered) {
+        sendBrokerHeartbeat()
+      } else {
+        sendBrokerRegistration()
+      }
+    }
+  }
+
+  private class ShutdownEvent extends EventQueue.Event {
+    override def run(): Unit = {
+      info(s"Transitioning from ${_state} to ${BrokerState.SHUTTING_DOWN}.")
+      _state = BrokerState.SHUTTING_DOWN
+      controlledShutdownFuture.complete(null)
+      initialCatchUpFuture.cancel(false)
+      if (_channelManager != null) {
+        _channelManager.shutdown()
+        _channelManager = null
+      }
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2db02a1..0ecb48c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -70,6 +70,9 @@ object Defaults {
   val BackgroundThreads = 10
   val QueuedMaxRequests = 500
   val QueuedMaxRequestBytes = -1
+  val InitialBrokerRegistrationTimeoutMs = 60000
+  val BrokerHeartbeatIntervalMs = 2000
+  val BrokerSessionTimeoutMs = 18000
 
   /** KIP-500 Configuration */
   val EmptyNodeId: Int = -1
@@ -369,6 +372,9 @@ object KafkaConfig {
 
   /** KIP-500 Configuration */
   val ProcessRolesProp = "process.roles"
+  val InitialBrokerRegistrationTimeoutMs = "initial.broker.registration.timeout.ms"
+  val BrokerHeartbeatIntervalMsProp = "broker.heartbeat.interval.ms"
+  val BrokerSessionTimeoutMsProp = "broker.session.timeout.ms"
   val NodeIdProp = "node.id"
   val MetadataLogDirProp = "metadata.log.dir"
 
@@ -659,6 +665,9 @@ object KafkaConfig {
   val ProcessRolesDoc = "The roles that this process plays: 'broker', 'controller', or 'broker,controller'
if it is both. " +
     "This configuration is only for clusters upgraded for KIP-500, which replaces the dependence
on Zookeeper with " +
     "a self-managed Raft quorum. Leave this config undefined or empty for Zookeeper clusters."
+  val InitialBrokerRegistrationTimeoutMsDoc = "When initially registering with the controller
quorum, the number of milliseconds to wait before declaring failure and exiting the broker
process."
+  val BrokerHeartbeatIntervalMsDoc = "The length of time in milliseconds between broker heartbeats.
Used when running in KIP-500 mode."
+  val BrokerSessionTimeoutMsDoc = "The length of time in milliseconds that a broker lease
lasts if no heartbeats are made. Used when running in KIP-500 mode."
   val NodeIdDoc = "The node ID associated with the roles this process is playing when `process.roles`
is non-empty. " +
     "This is required configuration when the self-managed quorum is enabled."
   val MetadataLogDirDoc = "This configuration determines where we put the metadata log for
clusters upgraded to " +
@@ -1064,6 +1073,9 @@ object KafkaConfig {
        */
       .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), ValidList.in("broker",
"controller"), HIGH, ProcessRolesDoc)
       .defineInternal(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, NodeIdDoc)
+      .defineInternal(InitialBrokerRegistrationTimeoutMs, INT, Defaults.InitialBrokerRegistrationTimeoutMs,
null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc)
+      .defineInternal(BrokerHeartbeatIntervalMsProp, INT, Defaults.BrokerHeartbeatIntervalMs,
null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
+      .defineInternal(BrokerSessionTimeoutMsProp, INT, Defaults.BrokerSessionTimeoutMs, null,
MEDIUM, BrokerSessionTimeoutMsDoc)
       .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
 
       /************* Authorizer Configuration ***********/
@@ -1497,6 +1509,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
   val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
   val processRoles: Set[ProcessRole] = parseProcessRoles()
+  val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMs)
+  val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
+  val brokerSessionTimeoutMs: Int = getInt(KafkaConfig.BrokerSessionTimeoutMsProp)
 
   def requiresZookeeper: Boolean = processRoles.isEmpty
   def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
new file mode 100644
index 0000000..a823ce6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import java.util.{Collections, Properties}
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
+
+import kafka.utils.{MockTime, TestUtils}
+import org.apache.kafka.clients.{Metadata, MockClient, NodeApiVersions}
+import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
+import org.apache.kafka.common.message.{BrokerHeartbeatResponseData, BrokerRegistrationResponseData}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.ApiKeys.{BROKER_HEARTBEAT, BROKER_REGISTRATION}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{BrokerHeartbeatResponse, BrokerRegistrationResponse}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.LogContext
+import org.apache.kafka.metadata.BrokerState
+import org.junit.jupiter.api.{Assertions, Test, Timeout}
+
+import scala.jdk.CollectionConverters._
+
+
+@Timeout(value = 12)
+class BrokerLifecycleManagerTest {
+  def configProperties = {
+    val properties = new Properties()
+    properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo")
+    properties.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    properties.setProperty(KafkaConfig.NodeIdProp, "1")
+    properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMs, "300000")
+    properties
+  }
+
+  class SimpleControllerNodeProvider extends ControllerNodeProvider {
+    val node = new AtomicReference[Node](null)
+
+    override def get(): Option[Node] = Option(node.get())
+
+    override def listenerName: ListenerName = new ListenerName("PLAINTEXT")
+
+    override def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT;
+  }
+
+  class BrokerLifecycleManagerTestContext(properties: Properties) {
+    val config = new KafkaConfig(properties)
+    val time = new MockTime(1, 1)
+    val highestMetadataOffset = new AtomicLong(0)
+    val metadata = new Metadata(1000, 1000, new LogContext(), new ClusterResourceListeners())
+    val mockClient = new MockClient(time, metadata)
+    val controllerNodeProvider = new SimpleControllerNodeProvider()
+    val nodeApiVersions = new NodeApiVersions(Seq(BROKER_REGISTRATION, BROKER_HEARTBEAT).map
{
+      apiKey => new ApiVersion().setApiKey(apiKey.id).
+        setMinVersion(apiKey.oldestVersion()).setMaxVersion(apiKey.latestVersion())
+    }.toList.asJava)
+    val mockChannelManager = new MockBrokerToControllerChannelManager(mockClient,
+      time, controllerNodeProvider, nodeApiVersions)
+    val clusterId = Uuid.fromString("x4AJGXQSRnephtTZzujw4w")
+    val advertisedListeners = new ListenerCollection()
+    config.advertisedListeners.foreach { ep =>
+      advertisedListeners.add(new Listener().setHost(ep.host).
+        setName(ep.listenerName.value()).
+        setPort(ep.port.shortValue()).
+        setSecurityProtocol(ep.securityProtocol.id))
+    }
+
+    def poll(): Unit = {
+      mockClient.wakeup()
+      mockChannelManager.poll()
+    }
+  }
+
+  @Test
+  def testCreateAndClose(): Unit = {
+    val context = new BrokerLifecycleManagerTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(context.config, context.time, None)
+    manager.close()
+  }
+
+  @Test
+  def testCreateStartAndClose(): Unit = {
+    val context = new BrokerLifecycleManagerTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(context.config, context.time, None)
+    Assertions.assertEquals(BrokerState.NOT_RUNNING, manager.state())
+    manager.start(() => context.highestMetadataOffset.get(),
+      context.mockChannelManager, context.clusterId, context.advertisedListeners,
+      Collections.emptyMap())
+    TestUtils.retry(60000) {
+      Assertions.assertEquals(BrokerState.STARTING, manager.state())
+    }
+    manager.close()
+    Assertions.assertEquals(BrokerState.SHUTTING_DOWN, manager.state())
+  }
+
+  @Test
+  def testSuccessfulRegistration(): Unit = {
+    val context = new BrokerLifecycleManagerTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(context.config, context.time, None)
+    val controllerNode = new Node(3000, "localhost", 8021)
+    context.controllerNodeProvider.node.set(controllerNode)
+    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
+      new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode)
+    manager.start(() => context.highestMetadataOffset.get(),
+      context.mockChannelManager, context.clusterId, context.advertisedListeners,
+      Collections.emptyMap())
+    TestUtils.retry(10000) {
+      context.poll()
+      Assertions.assertEquals(1000L, manager.brokerEpoch())
+    }
+    manager.close()
+
+  }
+
+  @Test
+  def testRegistrationTimeout(): Unit = {
+    val context = new BrokerLifecycleManagerTestContext(configProperties)
+    val controllerNode = new Node(3000, "localhost", 8021)
+    val manager = new BrokerLifecycleManager(context.config, context.time, None)
+    context.controllerNodeProvider.node.set(controllerNode)
+    def newDuplicateRegistrationResponse(): Unit = {
+      context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
+        new BrokerRegistrationResponseData().
+          setErrorCode(Errors.DUPLICATE_BROKER_REGISTRATION.code())), controllerNode)
+      context.mockChannelManager.poll()
+    }
+    newDuplicateRegistrationResponse()
+    Assertions.assertEquals(1, context.mockClient.futureResponses().size)
+    manager.start(() => context.highestMetadataOffset.get(),
+      context.mockChannelManager, context.clusterId, context.advertisedListeners,
+      Collections.emptyMap())
+    // We should send the first registration request and get a failure immediately
+    TestUtils.retry(60000) {
+      context.poll()
+      Assertions.assertEquals(0, context.mockClient.futureResponses().size)
+    }
+    // Verify that we resend the registration request.
+    newDuplicateRegistrationResponse()
+    TestUtils.retry(60000) {
+      context.time.sleep(100)
+      context.poll()
+      manager.eventQueue.wakeup()
+      Assertions.assertEquals(0, context.mockClient.futureResponses().size)
+    }
+    // Verify that we time out eventually.
+    context.time.sleep(300000)
+    TestUtils.retry(60000) {
+      context.poll()
+      manager.eventQueue.wakeup()
+      Assertions.assertEquals(BrokerState.SHUTTING_DOWN, manager.state())
+      Assertions.assertTrue(manager.initialCatchUpFuture.isCompletedExceptionally())
+      Assertions.assertEquals(-1L, manager.brokerEpoch())
+    }
+    manager.close()
+  }
+
+  @Test
+  def testControlledShutdown(): Unit = {
+    val context = new BrokerLifecycleManagerTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(context.config, context.time, None)
+    val controllerNode = new Node(3000, "localhost", 8021)
+    context.controllerNodeProvider.node.set(controllerNode)
+    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
+      new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode)
+    context.mockClient.prepareResponseFrom(new BrokerHeartbeatResponse(
+      new BrokerHeartbeatResponseData().setIsCaughtUp(true)), controllerNode)
+    manager.start(() => context.highestMetadataOffset.get(),
+      context.mockChannelManager, context.clusterId, context.advertisedListeners,
+      Collections.emptyMap())
+    TestUtils.retry(10000) {
+      context.poll()
+      manager.eventQueue.wakeup()
+      Assertions.assertEquals(BrokerState.RECOVERY, manager.state())
+    }
+    context.mockClient.prepareResponseFrom(new BrokerHeartbeatResponse(
+      new BrokerHeartbeatResponseData().setIsFenced(false)), controllerNode)
+    context.time.sleep(20)
+    TestUtils.retry(10000) {
+      context.poll()
+      manager.eventQueue.wakeup()
+      Assertions.assertEquals(BrokerState.RUNNING, manager.state())
+    }
+    manager.beginControlledShutdown()
+    TestUtils.retry(10000) {
+      context.poll()
+      manager.eventQueue.wakeup()
+      Assertions.assertEquals(BrokerState.PENDING_CONTROLLED_SHUTDOWN, manager.state())
+    }
+    context.mockClient.prepareResponseFrom(new BrokerHeartbeatResponse(
+      new BrokerHeartbeatResponseData().setShouldShutDown(true)), controllerNode)
+    context.time.sleep(3000)
+    TestUtils.retry(10000) {
+      context.poll()
+      manager.eventQueue.wakeup()
+      Assertions.assertEquals(BrokerState.SHUTTING_DOWN, manager.state())
+    }
+    manager.controlledShutdownFuture.get()
+    manager.close()
+  }
+}


Mime
View raw message