kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-1398 dynamic config changes are broken.
Date Fri, 18 Apr 2014 16:29:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 97f13ec73 -> b9351e04f


KAFKA-1398 dynamic config changes are broken.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9351e04
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9351e04
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9351e04

Branch: refs/heads/trunk
Commit: b9351e04f0a26f2f9e4abc7ec526ed802495f991
Parents: 97f13ec
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Thu Apr 17 17:07:27 2014 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Thu Apr 17 20:31:07 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/TopicConfigManager.scala | 52 ++++++++++++--------
 .../kafka/server/DynamicConfigChangeTest.scala  | 35 +++++++++++++
 2 files changed, 67 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b9351e04/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
index d41fd33..4a4274e 100644
--- a/core/src/main/scala/kafka/server/TopicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala
@@ -59,7 +59,7 @@ import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
  */
 class TopicConfigManager(private val zkClient: ZkClient,
                          private val logManager: LogManager,
-                         private val changeExpirationMs: Long = 10*60*1000,
+                         private val changeExpirationMs: Long = 15*60*1000,
                          private val time: Time = SystemTime) extends Logging {
   private var lastExecutedChange = -1L
   
@@ -86,7 +86,7 @@ class TopicConfigManager(private val zkClient: ZkClient,
    */
   private def processConfigChanges(notifications: Seq[String]) {
     if (notifications.size > 0) {
-      info("Processing %d topic config change notification(s)...".format(notifications.size))
+      info("Processing config change notification(s)...")
       val now = time.milliseconds
       val logs = logManager.logsByTopicPartition.toBuffer
       val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
@@ -94,26 +94,38 @@ class TopicConfigManager(private val zkClient: ZkClient,
         val changeId = changeNumber(notification)
         if (changeId > lastExecutedChange) {
           val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
-          val (topicJson, stat) = ZkUtils.readData(zkClient, changeZnode)
-          val topic = topicJson.substring(1, topicJson.length - 1) // dequote
-          if (logsByTopic.contains(topic)) {
-            /* combine the default properties with the overrides in zk to create the new
LogConfig */
-            val props = new Properties(logManager.defaultConfig.toProps)
-            props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
-            val logConfig = LogConfig.fromProps(props)
-            for (log <- logsByTopic(topic))
-              log.config = logConfig
-            lastExecutedChange = changeId
-            info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId,
topic, props))
-          } else {
-            if (now - stat.getCtime > changeExpirationMs) {
-              /* this change is now obsolete, try to delete it unless it is the last change
left */
-              error("Ignoring topic config change %d for topic %s since the change has expired")
-            } else {
-              error("Ignoring topic config change %d for topic %s since the topic may have
been deleted")
+          val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath
+ "/" + notification)
+          if(jsonOpt.isDefined) {
+            val json = jsonOpt.get
+            val topic = json.substring(1, json.length - 1) // hacky way to dequote
+            if (logsByTopic.contains(topic)) {
+              /* combine the default properties with the overrides in zk to create the new
LogConfig */
+              val props = new Properties(logManager.defaultConfig.toProps)
+              props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
+              val logConfig = LogConfig.fromProps(props)
+              for (log <- logsByTopic(topic))
+                log.config = logConfig
+              info("Processed topic config change %d for topic %s, setting new config to
%s.".format(changeId, topic, props))
+              purgeObsoleteNotifications(now, notifications)
             }
-            ZkUtils.deletePath(zkClient, changeZnode)
           }
+          lastExecutedChange = changeId
+        }
+      }
+    }
+  }
+  
+  private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
+    val eligible = notifications.sorted.dropRight(1) // never purge newest notification--we
need it for the seq number
+    for(notification <- eligible) {
+      val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath
+ "/" + notification)
+      if(jsonOpt.isDefined) {
+        val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
+        if (now - stat.getCtime > changeExpirationMs) {
+          debug("Purging config change notification " + notification)
+          ZkUtils.deletePath(zkClient, changeZnode)
+        } else {
+          return
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9351e04/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
new file mode 100644
index 0000000..5a1d5cc
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -0,0 +1,35 @@
+package kafka.server
+
+import junit.framework.Assert._
+import java.util.Properties
+import java.io.File
+import org.junit.{After, Before, Test}
+import kafka.integration.KafkaServerTestHarness
+import kafka.utils._
+import kafka.common._
+import kafka.log.LogConfig
+import kafka.admin.AdminUtils
+import org.scalatest.junit.JUnit3Suite
+
+class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
+  
+  override val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort)))
+
+  @Test
+  def testConfigChange() {
+    val oldVal = 100000
+    val newVal = 200000
+    val tp = TopicAndPartition("test", 0)
+    AdminUtils.createTopic(zkClient, tp.topic, 1, 1, LogConfig(flushInterval = oldVal).toProps)
+    TestUtils.retry(10000) {
+      val logOpt = this.servers(0).logManager.getLog(tp)
+      assertTrue(logOpt.isDefined)
+      assertEquals(oldVal, logOpt.get.config.flushInterval)
+    }
+    AdminUtils.changeTopicConfig(zkClient, tp.topic, LogConfig(flushInterval = newVal).toProps)
+    TestUtils.retry(10000) {
+      assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval)
+    }
+  }
+
+}
\ No newline at end of file


Mime
View raw message