kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix brokerId passed to metrics reporters (#4497)
Date Thu, 01 Feb 2018 01:59:54 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c38a345  MINOR: Fix brokerId passed to metrics reporters (#4497)
c38a345 is described below

commit c38a34559fd078be3ed30162619078f129802353
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Wed Jan 31 17:59:49 2018 -0800

    MINOR: Fix brokerId passed to metrics reporters (#4497)
    
    Remove caching of brokerId in DynamicBrokerConfig constructor and delay initialization
until brokerId is set in KafkaConfig.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../main/scala/kafka/server/DynamicBrokerConfig.scala    |  5 ++---
 core/src/main/scala/kafka/server/KafkaServer.scala       | 16 ++++++++--------
 .../kafka/server/DynamicBrokerReconfigurationTest.scala  |  3 +++
 3 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 168654d..9c85f9b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -119,7 +119,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
   private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val brokerId = kafkaConfig.brokerId
   private val reconfigurables = mutable.Buffer[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
@@ -128,7 +127,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
   private[server] def initialize(zkClient: KafkaZkClient): Unit = {
     val adminZkClient = new AdminZkClient(zkClient)
     updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
-    updateBrokerConfig(brokerId, adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString))
+    updateBrokerConfig(kafkaConfig.brokerId, adminZkClient.fetchEntityConfig(ConfigType.Broker,
kafkaConfig.brokerId.toString))
   }
 
   def addReconfigurables(kafkaServer: KafkaServer): Unit = {
@@ -136,7 +135,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
     if (kafkaServer.logManager.cleaner != null)
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
     addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
-    addReconfigurable(new DynamicMetricsReporters(brokerId, kafkaServer))
+    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
   }
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock)
{
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 98e4877..747a0df 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -199,14 +199,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
         /* setup zookeeper */
         initZkClient(time)
 
-        // initialize dynamic broker configs from ZooKeeper. Any updates made after this
will be
-        // applied after DynamicConfigManager starts.
-        config.dynamicConfig.initialize(zkClient)
-
-        /* start scheduler */
-        kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
-        kafkaScheduler.startup()
-
         /* Get or create cluster_id */
         _clusterId = getOrGenerateClusterId(zkClient)
         info(s"Cluster ID = $clusterId")
@@ -217,6 +209,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
         logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
         this.logIdent = logContext.logPrefix
 
+        // initialize dynamic broker configs from ZooKeeper. Any updates made after this
will be
+        // applied after DynamicConfigManager starts.
+        config.dynamicConfig.initialize(zkClient)
+
+        /* start scheduler */
+        kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
+        kafkaScheduler.startup()
+
         /* create and configure metrics */
         val reporters = new util.ArrayList[MetricsReporter]
         reporters.add(new JmxReporter(jmxPrefix))
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 374aac2..49d9953 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -469,6 +469,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
       assertFalse("No metrics found", reporter.kafkaMetrics.isEmpty)
       reporter.verifyMetricValue("request-total", "socket-server-metrics")
     }
+    assertEquals(servers.map(_.config.brokerId).toSet, TestMetricsReporter.configuredBrokers.toSet)
 
     val clientId = "test-client-1"
     val (producerThread, consumerThread) = startProduceConsume(retries = 0, clientId)
@@ -812,6 +813,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
 object TestMetricsReporter {
   val PollingIntervalProp = "polling.interval"
   val testReporters = new ConcurrentLinkedQueue[TestMetricsReporter]()
+  val configuredBrokers = mutable.Set[Int]()
 
   def waitForReporters(count: Int): List[TestMetricsReporter] = {
     TestUtils.waitUntilTrue(() => testReporters.size == count, msg = "Metrics reporters
not created")
@@ -839,6 +841,7 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable
with Close
   }
 
   override def configure(configs: util.Map[String, _]): Unit = {
+    configuredBrokers += configs.get(KafkaConfig.BrokerIdProp).toString.toInt
     configureCount += 1
     pollingInterval = configs.get(PollingIntervalProp).toString.toInt
   }

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message