kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] 01/02: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work (#9051)
Date Fri, 24 Jul 2020 19:08:59 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 67df232bec296eaf76650b01088049c11aa2a168
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Fri Jul 24 09:33:22 2020 +0800

    KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work (#9051)
    
    * KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
    
    https://issues.apache.org/jira/browse/KAFKA-10268
    
    Currently, ConfigCommand --delete-config API does not restore the config to default value,
no matter at broker-level or broker-default level. Besides, Admin.incrementalAlterConfigs
API also runs into this problem. This patch fixes it by removing the corresponding config
from the newConfig properties when reconfiguring dynamic broker config.
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  7 +++++--
 .../server/DynamicBrokerReconfigurationTest.scala  | 24 +++++++++++++++++++++-
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index ec8e00b..e71ac0b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -653,9 +653,12 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends
Brok
     val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
     val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals)
     newConfig.valuesFromThisConfig.forEach { (k, v) =>
-      if (DynamicLogConfig.ReconfigurableConfigs.contains(k) && v != null) {
+      if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
         DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
-          newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
+          if (v == null)
+             newBrokerDefaults.remove(configName)
+          else
+            newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
         }
       }
     }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index a51c014..8b1a648 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -622,12 +622,34 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
     // configuration across brokers, they can also be defined at per-broker level for testing
     props.clear()
     props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000")
+    props.put(KafkaConfig.LogRetentionTimeMillisProp, TimeUnit.DAYS.toMillis(2).toString)
     alterConfigsOnServer(servers.head, props)
     assertEquals(500000, servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
-    servers.tail.foreach { server => assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
}
+    assertEquals(TimeUnit.DAYS.toMillis(2), servers.head.config.values.get(KafkaConfig.LogRetentionTimeMillisProp))
+    servers.tail.foreach { server =>
+      assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
+      assertEquals(1680000000L, server.config.values.get(KafkaConfig.LogRetentionTimeMillisProp))
+    }
 
     // Verify that produce/consume worked throughout this test without any retries in producer
     stopAndVerifyProduceConsume(producerThread, consumerThread)
+
+    // Verify that configuration at both per-broker level and default cluster level could
be deleted and
+    // the default value should be restored
+    props.clear()
+    props.put(KafkaConfig.LogRetentionTimeMillisProp, "")
+    props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "")
+    TestUtils.incrementalAlterConfigs(servers.take(1), adminClients.head, props, perBrokerConfig
= true, opType = OpType.DELETE).all.get
+    TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig
= false, opType = OpType.DELETE).all.get
+    servers.foreach { server =>
+      waitForConfigOnServer(server, KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString)
+    }
+    servers.foreach { server =>
+      val log = server.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new
IllegalStateException("Log not found"))
+      // Verify default values for these two configurations are restored on all brokers
+      TestUtils.waitUntilTrue(() => log.config.maxIndexSize == Defaults.LogIndexSizeMaxBytes
&& log.config.retentionMs == 1680000000L,
+        "Existing topic config using defaults not updated")
+    }
   }
 
   @Test


Mime
View raw message