kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-7976; Update config before notifying controller of unclean leader update (#6426)
Date Tue, 12 Mar 2019 09:53:18 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new c43b914  KAFKA-7976; Update config before notifying controller of unclean leader
update (#6426)
c43b914 is described below

commit c43b914d909ab5460562f4c741b332f2a114a3a5
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Tue Mar 12 09:47:05 2019 +0000

    KAFKA-7976; Update config before notifying controller of unclean leader update (#6426)
    
    When unclean leader election is enabled dynamically on brokers, we notify controller of
the update before updating KafkaConfig. When processing this event, controller's decision
to elect unclean leaders is based on the current KafkaConfig, so there is a small timing window
when the controller may not elect unclean leader because KafkaConfig of the server was not
yet updated. The commit fixes this timing window by using the existing BrokerReconfigurable
interface used by other classes  [...]
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 31 +++++++++++++++-------
 .../server/DynamicBrokerReconfigurationTest.scala  |  4 ++-
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 1c56572..b02b842 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -204,13 +204,25 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
     brokerReconfigurables.clear()
   }
 
+  /**
+   * Add reconfigurables to be notified when a dynamic broker config is updated.
+   *
+   * `Reconfigurable` is the public API used by configurable plugins like metrics reporter
+   * and quota callbacks. These are reconfigured before `KafkaConfig` is updated so that
+   * the update can be aborted if `reconfigure()` fails with an exception.
+   *
+   * `BrokerReconfigurable` is used for internal reconfigurable classes. These are
+   * reconfigured after `KafkaConfig` is updated so that they can access `KafkaConfig`
+   * directly. They are provided both old and new configs.
+   */
   def addReconfigurables(kafkaServer: KafkaServer): Unit = {
+    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
+    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
+
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
     if (kafkaServer.logManager.cleaner != null)
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
-    addReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
-    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
-    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
+    addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
     addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer))
   }
@@ -565,25 +577,24 @@ object DynamicLogConfig {
   val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet -- ExcludedConfigs
   val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case (k, v) =>
(v, k) }
 }
-class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Reconfigurable
with Logging {
 
-  override def configure(configs: util.Map[String, _]): Unit = {}
+class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends BrokerReconfigurable
with Logging {
 
-  override def reconfigurableConfigs(): util.Set[String] = {
-    DynamicLogConfig.ReconfigurableConfigs.asJava
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicLogConfig.ReconfigurableConfigs
   }
 
-  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
     // For update of topic config overrides, only config names and types are validated
     // Names and types have already been validated. For consistency with topic config
     // validation, no additional validation is performed.
   }
 
-  override def reconfigure(configs: util.Map[String, _]): Unit = {
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     val currentLogConfig = logManager.currentDefaultConfig
     val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
     val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals)
-    configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach {
case (k, v) =>
+    newConfig.valuesFromThisConfig.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach
{ case (k, v) =>
       if (v != null) {
         DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
           newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 798961e..c28fcea 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -391,7 +391,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
 
     // Verify that configs of existing logs have been updated
     val newLogConfig = LogConfig(KafkaServer.copyKafkaConfigToLog(servers.head.config))
-    assertEquals(newLogConfig, servers.head.logManager.currentDefaultConfig)
+    TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig == newLogConfig,
+      "Config not updated in LogManager")
+
     val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw
new IllegalStateException("Log not found"))
     TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config
using defaults not updated")
     props.asScala.foreach { case (k, v) =>


Mime
View raw message