kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-9254; Overridden topic configs are reset after dynamic default change (#7870)
Date Sat, 25 Jan 2020 01:24:15 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 0e7f867  KAFKA-9254; Overridden topic configs are reset after dynamic default change
(#7870)
0e7f867 is described below

commit 0e7f867041959c5d77727c7f5ce32d363fa09fc2
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Sat Jan 25 06:47:22 2020 +0800

    KAFKA-9254; Overridden topic configs are reset after dynamic default change (#7870)
    
    Currently, when a dynamic change is made to the broker-level default log configuration,
existing log configs will be recreated with an empty overridden configs. In such case, when
updating dynamic broker configs a second round, the topic-level configs are lost. This can
cause unexpected data loss, for example, if the cleanup policy changes from "compact" to "delete."
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  2 +-
 .../server/DynamicBrokerReconfigurationTest.scala  | 36 +++++++++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 4bd0be7..e862326 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -552,7 +552,7 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends
Reco
       props ++= newBrokerDefaults.asScala
       props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)
 
-      val logConfig = LogConfig(props.asJava)
+      val logConfig = LogConfig(props.asJava, log.config.overriddenConfigs)
       log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
     }
     if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !origUncleanLeaderElectionEnable)
{
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 45fbcba..0a0dfcd 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -345,6 +345,40 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
   }
 
   @Test
+  def testConsecutiveConfigChange(): Unit = {
+    val topic2 = "testtopic2"
+    val topicProps = new Properties
+    topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2")
+    TestUtils.createTopic(zkClient, topic2, 1, replicationFactor = numServers, servers, topicProps)
+    var log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw
new IllegalStateException("Log not found"))
+    assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
+    assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString)
+
+    val props = new Properties
+    props.put(KafkaConfig.MinInSyncReplicasProp, "3")
+    // Make a broker-default config
+    reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MinInSyncReplicasProp,
"3"))
+    // Verify that all broker defaults have been updated again
+    servers.foreach { server =>
+      props.asScala.foreach { case (k, v) =>
+        assertEquals(s"Not reconfigured $k", v, server.config.originals.get(k).toString)
+      }
+    }
+
+    log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new
IllegalStateException("Log not found"))
+    assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
+    assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString)
// Verify topic-level config survives
+
+    // Make a second broker-default change
+    props.clear()
+    props.put(KafkaConfig.LogRetentionTimeMillisProp, "604800000")
+    reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogRetentionTimeMillisProp,
"604800000"))
+    log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new
IllegalStateException("Log not found"))
+    assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
+    assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString)
// Verify topic-level config still survives
+  }
+
+  @Test
   def testDefaultTopicConfig(): Unit = {
     val (producerThread, consumerThread) = startProduceConsume(retries = 0)
 
@@ -607,7 +641,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     val partitions = (0 until numPartitions).map(i => new TopicPartition(topic, i)).filter
{ tp =>
       zkClient.getLeaderForPartition(tp).contains(leaderId)
     }
-    assertTrue(s"Partitons not found with leader $leaderId", partitions.nonEmpty)
+    assertTrue(s"Partitions not found with leader $leaderId", partitions.nonEmpty)
     partitions.foreach { tp =>
       (1 to 2).foreach { i =>
         val replicaFetcherManager = servers(i).replicaManager.replicaFetcherManager


Mime
View raw message