kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] 02/02: KAFKA-9625: Fix altering and describing dynamic broker configurations (#8260)
Date Tue, 05 May 2020 21:19:16 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e5b5cbea70f5e02767c6df84f4cc7fbfb9288db2
Author: Sanjana Kaundinya <skaundinya@gmail.com>
AuthorDate: Tue Mar 17 23:02:33 2020 -0700

    KAFKA-9625: Fix altering and describing dynamic broker configurations (#8260)
    
    * Broker throttles were incorrectly marked as sensitive configurations.  Fix this, so
that their values can be returned via DescribeConfigs as expected.
    
    * Previously, changes to broker configs that consisted only of deletions were ignored
by the brokers because of faulty delta calculation logic that didn't consider deletions as
changes, only alterations as changes.  Fix this and add a regression test.
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>
    (cherry picked from commit 5fc3cd61fcb73da8b52f34b72fe6bb7457f46ce2)
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 33 ++++++---
 .../main/scala/kafka/server/DynamicConfig.scala    |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 22 +++++-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 79 +++++++++++++++++++++-
 4 files changed, 121 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 92aa048..2f47c21 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -468,10 +468,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
       reconfigurable.reconfigure(newConfig)
   }
 
-  private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[_,
_]): mutable.Map[String, _] = {
-    newProps.asScala.filter {
+  /**
+   * Returns the change in configurations between the new props and current props by returning
a
+   * map of the changed configs, as well as the set of deleted keys
+   */
+  private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[String,
_]):
+  (mutable.Map[String, _], Set[String]) = {
+    val changeMap = newProps.asScala.filter {
       case (k, v) => v != currentProps.get(k)
     }
+    val deletedKeySet = currentProps.asScala.filter {
+      case (k, _) => !newProps.containsKey(k)
+    }.keySet
+    (changeMap, deletedKeySet)
   }
 
   /**
@@ -510,8 +519,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
 
   private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean):
(KafkaConfig, List[BrokerReconfigurable]) = {
     val newConfig = new KafkaConfig(newProps.asJava, !validateOnly, None)
-    val updatedMap = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
-    if (updatedMap.nonEmpty) {
+    val (changeMap, deletedKeySet) = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
+    if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
       try {
         val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig)
// non-Kafka configs
         newConfig.valuesFromThisConfig.keySet.asScala.foreach(customConfigs.remove)
@@ -519,14 +528,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
           case listenerReconfigurable: ListenerReconfigurable =>
             processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs,
validateOnly, reloadOnly = false)
           case reconfigurable =>
-            if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet))
-              processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig,
customConfigs, validateOnly)
+            if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet,
deletedKeySet))
+              processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig,
customConfigs, validateOnly)
         }
 
         // BrokerReconfigurable updates are processed after config is updated. Only do the
validation here.
         val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
         brokerReconfigurables.foreach { reconfigurable =>
-          if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet))
{
+          if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet,
deletedKeySet)) {
             reconfigurable.validateReconfiguration(newConfig)
             if (!validateOnly)
               brokerReconfigurablesToUpdate += reconfigurable
@@ -544,8 +553,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
       (currentConfig, List.empty)
   }
 
-  private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys:
Set[String]): Boolean = {
-    reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty
+  private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys:
Set[String], deletedKeys: Set[String]): Boolean = {
+    reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty ||
+      reconfigurableConfigs.asScala.intersect(deletedKeys).nonEmpty
   }
 
   private def processListenerReconfigurable(listenerReconfigurable: ListenerReconfigurable,
@@ -556,8 +566,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
     val listenerName = listenerReconfigurable.listenerName
     val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
     val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
-    val updatedKeys = updatedConfigs(newValues, oldValues).keySet
-    val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs,
updatedKeys)
+    val (changeMap, deletedKeys) = updatedConfigs(newValues, oldValues)
+    val updatedKeys = changeMap.keySet
+    val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs,
updatedKeys, deletedKeys)
     // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if
configs have changed
     if (reloadOnly != configsChanged)
       processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs,
validateOnly)
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index e5974d3..b70579e 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -54,7 +54,7 @@ object DynamicConfig {
       s"This property can be only set dynamically. It is suggested that the limit be kept
above 1MB/s for accurate behaviour."
 
     //Definitions
-    private val brokerConfigDef = new ConfigDef()
+    val brokerConfigDef = new ConfigDef()
       //round minimum value down, to make it easier for users.
       .define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate,
atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc)
       .define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate,
atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index dc5c3c7..fe0f8df 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1229,9 +1229,13 @@ object KafkaConfig {
 
   def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
 
-  def configType(configName: String): Option[ConfigDef.Type] = {
-    def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
+  private def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
 
+  def configType(configName: String): Option[ConfigDef.Type] = {
+    val configType = configTypeExact(configName)
+    if (configType.isDefined) {
+      return configType
+    }
     typeOf(configName) match {
       case Some(t) => Some(t)
       case None =>
@@ -1239,6 +1243,20 @@ object KafkaConfig {
     }
   }
 
+  private def configTypeExact(exactName: String): Option[ConfigDef.Type] = {
+    val configType = typeOf(exactName).orNull
+    if (configType != null) {
+      Some(configType)
+    } else {
+      val configKey = DynamicConfig.Broker.brokerConfigDef.configKeys().get(exactName)
+      if (configKey != null) {
+        Some(configKey.`type`)
+      } else {
+        None
+      }
+    }
+  }
+
   def maybeSensitive(configType: Option[ConfigDef.Type]): Boolean = {
     // If we can't determine the config entry type, treat it as a sensitive config to be
safe
     configType.isEmpty || configType.contains(ConfigDef.Type.PASSWORD)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c33df2d..4a27b2c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -27,7 +27,7 @@ import java.{time, util}
 
 import kafka.log.LogConfig
 import kafka.security.authorizer.AclEntry
-import kafka.server.{Defaults, KafkaConfig, KafkaServer}
+import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
 import kafka.utils.{Log4jController, TestUtils}
 import kafka.zk.KafkaZkClient
@@ -1727,6 +1727,83 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest
{
   }
 
   @Test
+  def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(): Unit = {
+    client = Admin.create(createConfig())
+    val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
"123"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"456"),
+          AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
"")) &&
+        "456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"")))
+    }, "Expected to see the broker properties we just set", pause=25)
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
""),
+        AlterConfigOp.OpType.DELETE),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"654"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
"987"),
+          AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
"")) &&
+        "654".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"")) &&
+        "987".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
"")))
+    }, "Expected to see the broker properties we just modified", pause=25)
+  }
+
+  @Test
+  def testIncrementalAlterConfigsDeleteBrokerConfigs(): Unit = {
+    client = Admin.create(createConfig())
+    val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
"123"),
+        AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"456"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
"789"),
+          AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
"")) &&
+        "456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"")) &&
+        "789".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
"")))
+    }, "Expected to see the broker properties we just set", pause=25)
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
""),
+        AlterConfigOp.OpType.DELETE),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
""),
+          AlterConfigOp.OpType.DELETE),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
""),
+          AlterConfigOp.OpType.DELETE)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
"")) &&
+        "".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"")) &&
+        "".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
"")))
+    }, "Expected to see the broker properties we just removed to be deleted", pause=25)
+  }
+
+  @Test
   def testInvalidIncrementalAlterConfigs(): Unit = {
     client = Admin.create(createConfig)
 


Mime
View raw message