kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 3.0 updated: KAFKA-13219: BrokerState metric not working for KRaft clusters (#11239)
Date Mon, 23 Aug 2021 21:05:11 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new b4529ac  KAFKA-13219: BrokerState metric not working for KRaft clusters (#11239)
b4529ac is described below

commit b4529ac6dab00dd5003d27e9288e50b7970133d1
Author: Ron Dagostino <rdagostino@confluent.io>
AuthorDate: Mon Aug 23 16:45:44 2021 -0400

    KAFKA-13219: BrokerState metric not working for KRaft clusters (#11239)
    
    The BrokerState metric always has a value of 0, for NOT_RUNNING, in KRaft
    clusters. This patch fixes it and adds a test.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../scala/kafka/server/BrokerLifecycleManager.scala  |  8 ++++----
 core/src/main/scala/kafka/server/BrokerServer.scala  |  8 ++++----
 core/src/main/scala/kafka/server/KafkaBroker.scala   |  4 +---
 core/src/main/scala/kafka/server/KafkaServer.scala   | 15 +++++++++------
 .../test/junit/RaftClusterInvocationContext.java     |  2 +-
 .../integration/kafka/server/KRaftClusterTest.scala  | 12 ++++++------
 .../kafka/server/BrokerLifecycleManagerTest.scala    | 20 ++++++++++----------
 7 files changed, 35 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index ae9634b..e15a3e6 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -166,12 +166,12 @@ class BrokerLifecycleManager(val config: KafkaConfig,
    * The channel manager, or null if this manager has not been started yet.  This variable
    * can only be read or written from the event queue thread.
    */
-  var _channelManager: BrokerToControllerChannelManager = _
+  private var _channelManager: BrokerToControllerChannelManager = _
 
   /**
    * The event queue.
    */
-  val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
+  private[server] val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
 
   /**
    * Start the BrokerLifecycleManager.
@@ -193,9 +193,9 @@ class BrokerLifecycleManager(val config: KafkaConfig,
     eventQueue.append(new SetReadyToUnfenceEvent())
   }
 
-  def brokerEpoch(): Long = _brokerEpoch
+  def brokerEpoch: Long = _brokerEpoch
 
-  def state(): BrokerState = _state
+  def state: BrokerState = _state
 
   private class BeginControlledShutdownEvent extends EventQueue.Event {
     override def run(): Unit = {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index d6e82e7..d2079c4 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -84,6 +84,8 @@ class BrokerServer(
   val supportedFeatures: util.Map[String, VersionRange]
 ) extends KafkaBroker {
 
+  override def brokerState: BrokerState = lifecycleManager.state
+
   import kafka.server.Server._
 
   private val logContext: LogContext = new LogContext(s"[BrokerServer id=${config.nodeId}]
")
@@ -248,7 +250,7 @@ class BrokerServer(
         scheduler = kafkaScheduler,
         time = time,
         brokerId = config.nodeId,
-        brokerEpochSupplier = () => lifecycleManager.brokerEpoch()
+        brokerEpochSupplier = () => lifecycleManager.brokerEpoch
       )
       alterIsrManager.start()
 
@@ -270,7 +272,7 @@ class BrokerServer(
 
       val producerIdManagerSupplier = () => ProducerIdManager.rpc(
         config.brokerId,
-        brokerEpochSupplier = () => lifecycleManager.brokerEpoch(),
+        brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
         clientToControllerChannelManager,
         config.requestTimeoutMs
       )
@@ -516,6 +518,4 @@ class BrokerServer(
 
   def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName)
 
-  def currentState(): BrokerState = lifecycleManager.state()
-
 }
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 9b9fc97..91d8a77 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -63,11 +63,9 @@ object KafkaBroker {
 }
 
 trait KafkaBroker extends KafkaMetricsGroup {
-  @volatile private var _brokerState: BrokerState = BrokerState.NOT_RUNNING
 
   def authorizer: Option[Authorizer]
-  def brokerState: BrokerState = _brokerState
-  protected def brokerState_= (brokerState: BrokerState): Unit = _brokerState = brokerState
+  def brokerState: BrokerState
   def clusterId: String
   def config: KafkaConfig
   def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0a565e1..b3c66a6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -96,6 +96,7 @@ class KafkaServer(
   private val isShuttingDown = new AtomicBoolean(false)
   private val isStartingUp = new AtomicBoolean(false)
 
+  @volatile private var _brokerState: BrokerState = BrokerState.NOT_RUNNING
   private var shutdownLatch = new CountDownLatch(1)
   private var logContext: LogContext = null
 
@@ -161,6 +162,8 @@ class KafkaServer(
   val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
   val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
 
+  override def brokerState: BrokerState = _brokerState
+
   def clusterId: String = _clusterId
 
   // Visible for testing
@@ -188,7 +191,7 @@ class KafkaServer(
 
       val canStartup = isStartingUp.compareAndSet(false, true)
       if (canStartup) {
-        brokerState = BrokerState.STARTING
+        _brokerState = BrokerState.STARTING
 
         /* setup zookeeper */
         initZkClient(time)
@@ -250,7 +253,7 @@ class KafkaServer(
         logManager = LogManager(config, initialOfflineDirs,
           new ZkConfigRepository(new AdminZkClient(zkClient)),
           kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, config.usesTopicId)
-        brokerState = BrokerState.RECOVERY
+        _brokerState = BrokerState.RECOVERY
         logManager.startup(zkClient.getAllTopicsInCluster())
 
         metadataCache = MetadataCache.zkMetadataCache(config.brokerId)
@@ -418,7 +421,7 @@ class KafkaServer(
 
         socketServer.startProcessingRequests(authorizerFutures)
 
-        brokerState = BrokerState.RUNNING
+        _brokerState = BrokerState.RUNNING
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
         isStartingUp.set(false)
@@ -631,7 +634,7 @@ class KafkaServer(
       // the shutdown.
       info("Starting controlled shutdown")
 
-      brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
+      _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
 
       val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
 
@@ -656,7 +659,7 @@ class KafkaServer(
       // `true` at the end of this method.
       if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true))
{
         CoreUtils.swallow(controlledShutdown(), this)
-        brokerState = BrokerState.SHUTTING_DOWN
+        _brokerState = BrokerState.SHUTTING_DOWN
 
         if (dynamicConfigManager != null)
           CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
@@ -726,7 +729,7 @@ class KafkaServer(
         // Clear all reconfigurable instances stored in DynamicBrokerConfig
         config.dynamicConfig.clear()
 
-        brokerState = BrokerState.NOT_RUNNING
+        _brokerState = BrokerState.NOT_RUNNING
 
         startupComplete.set(false)
         isShuttingDown.set(false)
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index fc6b557..c60e0ec 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -92,7 +92,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
                 cluster.format();
                 cluster.startup();
                 kafka.utils.TestUtils.waitUntilTrue(
-                    () -> cluster.brokers().get(0).currentState() == BrokerState.RUNNING,
+                    () -> cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
                     () -> "Broker never made it to RUNNING state.",
                     org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
                     100L);
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 39afbe4..b7ca0f7 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -30,9 +30,9 @@ import org.apache.kafka.common.requests.{DescribeClusterRequest, DescribeCluster
 import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Tag, Test, Timeout}
+
 import java.util
 import java.util.{Arrays, Collections, Optional}
-
 import scala.collection.mutable
 import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
 import scala.jdk.CollectionConverters._
@@ -64,7 +64,7 @@ class KRaftClusterTest {
     try {
       cluster.format()
       cluster.startup()
-      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING,
+      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING,
         "Broker never made it to RUNNING state.")
       TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
         "RaftManager was not initialized.")
@@ -90,7 +90,7 @@ class KRaftClusterTest {
       cluster.format()
       cluster.startup()
       cluster.waitForReadyBrokers()
-      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING,
+      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING,
         "Broker never made it to RUNNING state.")
       TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
         "RaftManager was not initialized.")
@@ -127,7 +127,7 @@ class KRaftClusterTest {
       cluster.format()
       cluster.startup()
       cluster.waitForReadyBrokers()
-      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING,
+      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING,
         "Broker never made it to RUNNING state.")
       TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
         "RaftManager was not initialized.")
@@ -160,7 +160,7 @@ class KRaftClusterTest {
     try {
       cluster.format()
       cluster.startup()
-      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING,
+      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING,
         "Broker never made it to RUNNING state.")
       val admin = Admin.create(cluster.clientProperties())
       try {
@@ -317,7 +317,7 @@ class KRaftClusterTest {
   private def waitForRunningBrokers(count: Int, waitTime: FiniteDuration)
                                    (implicit cluster: KafkaClusterTestKit): Seq[BrokerServer]
= {
     def getRunningBrokerServers: Seq[BrokerServer] = cluster.brokers.values.asScala.toSeq
-      .filter(brokerServer => brokerServer.currentState() == BrokerState.RUNNING)
+      .filter(brokerServer => brokerServer.brokerState == BrokerState.RUNNING)
 
     val (runningBrokerServers, hasRunningBrokers) = TestUtils.computeUntilTrue(getRunningBrokerServers,
waitTime.toMillis)(_.nonEmpty)
     assertTrue(hasRunningBrokers,
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index a551288..d97724d 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -104,15 +104,15 @@ class BrokerLifecycleManagerTest {
   def testCreateStartAndClose(): Unit = {
     val context = new BrokerLifecycleManagerTestContext(configProperties)
     val manager = new BrokerLifecycleManager(context.config, context.time, None)
-    assertEquals(BrokerState.NOT_RUNNING, manager.state())
+    assertEquals(BrokerState.NOT_RUNNING, manager.state)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, context.advertisedListeners,
       Collections.emptyMap())
     TestUtils.retry(60000) {
-      assertEquals(BrokerState.STARTING, manager.state())
+      assertEquals(BrokerState.STARTING, manager.state)
     }
     manager.close()
-    assertEquals(BrokerState.SHUTTING_DOWN, manager.state())
+    assertEquals(BrokerState.SHUTTING_DOWN, manager.state)
   }
 
   @Test
@@ -128,7 +128,7 @@ class BrokerLifecycleManagerTest {
       Collections.emptyMap())
     TestUtils.retry(10000) {
       context.poll()
-      assertEquals(1000L, manager.brokerEpoch())
+      assertEquals(1000L, manager.brokerEpoch)
     }
     manager.close()
 
@@ -169,9 +169,9 @@ class BrokerLifecycleManagerTest {
     TestUtils.retry(60000) {
       context.poll()
       manager.eventQueue.wakeup()
-      assertEquals(BrokerState.SHUTTING_DOWN, manager.state())
+      assertEquals(BrokerState.SHUTTING_DOWN, manager.state)
       assertTrue(manager.initialCatchUpFuture.isCompletedExceptionally())
-      assertEquals(-1L, manager.brokerEpoch())
+      assertEquals(-1L, manager.brokerEpoch)
     }
     manager.close()
   }
@@ -192,7 +192,7 @@ class BrokerLifecycleManagerTest {
     TestUtils.retry(10000) {
       context.poll()
       manager.eventQueue.wakeup()
-      assertEquals(BrokerState.RECOVERY, manager.state())
+      assertEquals(BrokerState.RECOVERY, manager.state)
     }
     context.mockClient.prepareResponseFrom(new BrokerHeartbeatResponse(
       new BrokerHeartbeatResponseData().setIsFenced(false)), controllerNode)
@@ -200,13 +200,13 @@ class BrokerLifecycleManagerTest {
     TestUtils.retry(10000) {
       context.poll()
       manager.eventQueue.wakeup()
-      assertEquals(BrokerState.RUNNING, manager.state())
+      assertEquals(BrokerState.RUNNING, manager.state)
     }
     manager.beginControlledShutdown()
     TestUtils.retry(10000) {
       context.poll()
       manager.eventQueue.wakeup()
-      assertEquals(BrokerState.PENDING_CONTROLLED_SHUTDOWN, manager.state())
+      assertEquals(BrokerState.PENDING_CONTROLLED_SHUTDOWN, manager.state)
       assertTrue(context.mockClient.hasInFlightRequests)
     }
 
@@ -226,7 +226,7 @@ class BrokerLifecycleManagerTest {
     TestUtils.retry(10000) {
       context.poll()
       manager.eventQueue.wakeup()
-      assertEquals(BrokerState.SHUTTING_DOWN, manager.state())
+      assertEquals(BrokerState.SHUTTING_DOWN, manager.state)
     }
     manager.controlledShutdownFuture.get()
     manager.close()

Mime
View raw message