kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 1.1 updated: MINOR: Clean up to avoid errors in dynamic broker config tests (#5486)
Date Thu, 16 Aug 2018 16:37:17 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new d5574eb  MINOR: Clean up to avoid errors in dynamic broker config tests (#5486)
d5574eb is described below

commit d5574ebd9b72b88c33c221a6a73956594d72c3fa
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Tue Aug 14 17:14:48 2018 +0100

    MINOR: Clean up to avoid errors in dynamic broker config tests (#5486)
    
    Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Viktor Somogyi <viktorsomogyi@gmail.com>,
Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/server/DynamicBrokerConfig.scala   | 12 ++++++++++++
 core/src/main/scala/kafka/server/KafkaServer.scala           |  3 +++
 .../kafka/server/DynamicBrokerReconfigurationTest.scala      |  2 +-
 3 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 766907a..5aaec89 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -153,6 +153,18 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
     updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
   }
 
+  /**
+   * Clear all cached values. This is used to clear state on broker shutdown to avoid
+   * exceptions in tests when broker is restarted. These fields are re-initialized when
+   * broker starts up.
+   */
+  private[server] def clear(): Unit = {
+    dynamicBrokerConfigs.clear()
+    dynamicDefaultConfigs.clear()
+    reconfigurables.clear()
+    brokerReconfigurables.clear()
+  }
+
   def addReconfigurables(kafkaServer: KafkaServer): Unit = {
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
     if (kafkaServer.logManager.cleaner != null)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 774bcb4..c8467f9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -608,6 +608,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         if (brokerTopicStats != null)
           CoreUtils.swallow(brokerTopicStats.close(), this)
 
+        // Clear all reconfigurable instances stored in DynamicBrokerConfig
+        config.dynamicConfig.clear()
+
         brokerState.newState(NotRunning)
 
         startupComplete.set(false)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index e0ab55c..5ef7cd2 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -125,10 +125,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
       servers += TestUtils.createServer(kafkaConfig)
     }
 
+    TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers,
servers)
     TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
       replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
 
-    TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers,
servers)
     createAdminClient(SecurityProtocol.SSL, SecureInternal)
 
     TestMetricsReporter.testReporters.clear()


Mime
View raw message