kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4177; Remove ThrottledReplicationRateLimit from Server Config
Date Tue, 27 Sep 2016 19:40:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5d6408f6c -> b8ed4a511


KAFKA-4177; Remove ThrottledReplicationRateLimit from Server Config

This small PR pulls ThrottledReplicationRateLimit out of KafkaConfig and puts it in a class
that defines Dynamic Configs. Client configs are also placed in this class and validation
added.

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1864 from benstopford/KAFKA-4177


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8ed4a51
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8ed4a51
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8ed4a51

Branch: refs/heads/trunk
Commit: b8ed4a51134ca8b98d1445871a55ed33b6ad5b92
Parents: 5d6408f
Author: Ben Stopford <benstopford@gmail.com>
Authored: Tue Sep 27 20:27:20 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Sep 27 20:37:49 2016 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  6 +-
 .../main/scala/kafka/admin/ConfigCommand.scala  | 18 ++--
 .../kafka/admin/ReassignPartitionsCommand.scala |  8 +-
 .../main/scala/kafka/server/ConfigHandler.scala | 17 ++--
 .../main/scala/kafka/server/DynamicConfig.scala | 86 ++++++++++++++++++++
 .../main/scala/kafka/server/KafkaConfig.scala   |  7 --
 .../integration/kafka/api/BaseQuotaTest.scala   | 10 +--
 .../kafka/api/ClientIdQuotaTest.scala           |  6 +-
 .../kafka/api/UserClientIdQuotaTest.scala       |  6 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala | 26 +++---
 .../admin/ReassignPartitionsClusterTest.scala   |  6 +-
 .../kafka/admin/ReplicationQuotaUtils.scala     |  6 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  5 +-
 .../unit/kafka/server/DynamicConfigTest.scala   | 58 +++++++++++++
 .../kafka/server/ReplicationQuotasTest.scala    | 26 +++---
 .../test/scala/unit/kafka/utils/TestUtils.scala |  6 ++
 16 files changed, 211 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index b3f8e5c..0273bdb 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -20,7 +20,7 @@ package kafka.admin
 import kafka.common._
 import kafka.cluster.Broker
 import kafka.log.LogConfig
-import kafka.server.{KafkaConfig, ConfigType}
+import kafka.server.{DynamicConfig, ConfigType}
 import kafka.utils._
 import kafka.utils.ZkUtils._
 import java.util.Random
@@ -488,6 +488,7 @@ object AdminUtils extends Logging with AdminUtilities {
    *
    */
   def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties) {
+    DynamicConfig.Client.validate(configs)
     changeEntityConfig(zkUtils, ConfigType.Client, clientId, configs)
   }
 
@@ -503,6 +504,7 @@ object AdminUtils extends Logging with AdminUtilities {
    *
    */
   def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs:
Properties) {
+    DynamicConfig.Client.validate(configs)
     changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs)
   }
 
@@ -532,7 +534,7 @@ object AdminUtils extends Logging with AdminUtilities {
     * @param configs: The config to change, as properties
     */
   def changeBrokerConfig(zkUtils: ZkUtils, brokers: Seq[Int], configs: Properties): Unit
= {
-    KafkaConfig.validateNames(configs)
+    DynamicConfig.Broker.validate(configs)
     brokers.foreach { broker =>
       changeEntityConfig(zkUtils, ConfigType.Broker, broker.toString, configs)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 700c54d..20048ec 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -18,16 +18,14 @@
 package kafka.admin
 
 import java.util.Properties
-
 import joptsimple._
-import kafka.admin.TopicCommand._
 import kafka.common.Config
-import kafka.log.{Defaults, LogConfig}
-import kafka.server.{KafkaConfig, QuotaConfigOverride, ConfigType, ConfigEntityName, QuotaId}
+import kafka.log.{LogConfig}
+import kafka.server.{ConfigEntityName, QuotaId}
+import kafka.server.{DynamicConfig, ConfigType}
 import kafka.utils.{CommandLineUtils, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
-
 import scala.collection.JavaConversions._
 import scala.collection._
 
@@ -277,12 +275,10 @@ object ConfigCommand extends Config {
 
     val nl = System.getProperty("line.separator")
     val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square
brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The
following is a list of valid configurations: " +
-            "For entity_type '" + ConfigType.Topic + "': " + nl + LogConfig.configNames.map("\t"
+ _).mkString(nl) + nl +
-            "For entity_type '" + ConfigType.Broker + "': " + nl + KafkaConfig.dynamicBrokerConfigs.map("\t"
+ _).mkString(nl) + nl +
-            "For entity_type '" + ConfigType.Client + "': " + nl + "\t" + QuotaConfigOverride.ProducerOverride
-                                                            + nl + "\t" + QuotaConfigOverride.ConsumerOverride
+ nl +
-            "For entity_type '" + ConfigType.User + "': " + nl + "\t" + QuotaConfigOverride.ProducerOverride
-                                                          + nl + "\t" + QuotaConfigOverride.ConsumerOverride
+ nl +
+            "For entity_type '" + ConfigType.Topic + "': " + LogConfig.configNames.map("\t"
+ _).mkString(nl, nl, nl) +
+            "For entity_type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.map("\t"
+ _).mkString(nl, nl, nl) +
+            "For entity_type '" + ConfigType.User + "': " + DynamicConfig.Client.names.map("\t"
+ _).mkString(nl, nl, nl) +
+            "For entity_type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.map("\t"
+ _).mkString(nl, nl, nl) +
             s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified
together to update config for clients of a specific user.")
             .withRequiredArg
             .ofType(classOf[String])

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 5059463..2fa75f6 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -19,7 +19,7 @@ package kafka.admin
 import java.util.Properties
 import joptsimple.OptionParser
 import kafka.log.LogConfig
-import kafka.server.{ConfigType, KafkaConfig}
+import kafka.server.{DynamicConfig, ConfigType}
 import kafka.utils._
 import scala.collection._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -82,7 +82,7 @@ object ReassignPartitionsCommand extends Logging {
       //Remove the throttle limit from all brokers in the cluster
       for (brokerId <- zkUtils.getAllBrokersInCluster().map(_.id)) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, brokerId.toString)
-        if (configs.remove(KafkaConfig.ThrottledReplicationRateLimitProp) != null) {
+        if (configs.remove(DynamicConfig.Broker.ThrottledReplicationRateLimitProp) != null){
           AdminUtils.changeBrokerConfig(zkUtils, Seq(brokerId), configs)
           changed = true
         }
@@ -280,7 +280,7 @@ object ReassignPartitionsCommand extends Logging {
                       .describedAs("brokerlist")
                       .ofType(classOf[String])
     val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica
assignment")
-    val throttleOpt = parser.accepts("throttle", "The movement of partitions will be throttled
to this value (bytes/sec). Rerunning with this option, whilst a rebalance is in progress,
will alter the throttle value.")
+    val throttleOpt = parser.accepts("throttle", "The movement of partitions will be throttled
to this value (bytes/sec). Rerunning with this option, whilst a rebalance is in progress,
will alter the throttle value. The throttle rate should be at least 1 KB/s.")
                       .withRequiredArg()
                       .describedAs("throttle")
                       .defaultsTo("-1")
@@ -307,7 +307,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: Map[TopicAndPartit
 
       for (id <- brokers) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, id.toString)
-        configs.put(KafkaConfig.ThrottledReplicationRateLimitProp, throttle.toString)
+        configs.put(DynamicConfig.Broker.ThrottledReplicationRateLimitProp, throttle.toString)
         AdminUtils.changeBrokerConfig(zkUtils, Seq(id), configs)
       }
       println(f"The throttle limit was set to $throttle%,d B/s")

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 5be9c12..c3a07aa 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -19,10 +19,10 @@ package kafka.server
 
 import java.util.Properties
 
+import DynamicConfig.Broker._
 import kafka.api.ApiVersion
 import kafka.log.{LogConfig, LogManager}
 import kafka.server.Constants._
-import kafka.server.KafkaConfig._
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.Logging
 import org.apache.kafka.common.config.ConfigDef.Validator
@@ -30,7 +30,6 @@ import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.metrics.Quota._
 import scala.collection.JavaConverters._
-
 /**
   * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
   */
@@ -96,10 +95,6 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig:
KafkaC
   }
 }
 
-object QuotaConfigOverride {
-  val ProducerOverride = "producer_byte_rate"
-  val ConsumerOverride = "consumer_byte_rate"
-}
 
 /**
  * Handles <client-id>, <user> or <user, client-id> quota config updates
in ZK.
@@ -109,14 +104,14 @@ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
 
   def updateQuotaConfig(sanitizedUser: Option[String], clientId: Option[String], config:
Properties) {
     val producerQuota =
-      if (config.containsKey(QuotaConfigOverride.ProducerOverride))
-        Some(new Quota(config.getProperty(QuotaConfigOverride.ProducerOverride).toLong, true))
+      if (config.containsKey(DynamicConfig.Client.ProducerByteRateOverrideProp))
+        Some(new Quota(config.getProperty(DynamicConfig.Client.ProducerByteRateOverrideProp).toLong,
true))
       else
         None
     quotaManagers.produce.updateQuota(sanitizedUser, clientId, producerQuota)
     val consumerQuota =
-      if (config.containsKey(QuotaConfigOverride.ConsumerOverride))
-        Some(new Quota(config.getProperty(QuotaConfigOverride.ConsumerOverride).toLong, true))
+      if (config.containsKey(DynamicConfig.Client.ConsumerByteRateOverrideProp))
+        Some(new Quota(config.getProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp).toLong,
true))
       else
         None
     quotaManagers.fetch.updateQuota(sanitizedUser, clientId, consumerQuota)
@@ -160,7 +155,7 @@ class UserConfigHandler(private val quotaManagers: QuotaManagers) extends
QuotaC
 class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quotaManagers:
QuotaManagers) extends ConfigHandler with Logging {
   def processConfigChanges(brokerId: String, properties: Properties) {
     if (brokerConfig.brokerId == brokerId.trim.toInt) {
-      val limit = if (properties.containsKey(ThrottledReplicationRateLimitProp)) properties.getProperty(ThrottledReplicationRateLimitProp).toLong
else Defaults.ThrottledReplicationRateLimit
+      val limit = if (properties.containsKey(ThrottledReplicationRateLimitProp)) properties.getProperty(ThrottledReplicationRateLimitProp).toLong
else DefaultThrottledReplicationRateLimit
       quotaManagers.leader.updateQuota(upperBound(limit))
       quotaManagers.follower.updateQuota(upperBound(limit))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/main/scala/kafka/server/DynamicConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
new file mode 100644
index 0000000..51e9818
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.Properties
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef.Importance._
+import org.apache.kafka.common.config.ConfigDef.Range._
+import org.apache.kafka.common.config.ConfigDef.Type._
+import scala.collection.JavaConverters._
+
+/**
+  * Class used to hold dynamic configs. These are configs which have no physical manifestation
in the server.properties
+  * and can only be set dynamically.
+  */
+object DynamicConfig {
+
+  object Broker {
+    //Properties
+    val ThrottledReplicationRateLimitProp = "replication.quota.throttled.rate"
+
+    //Defaults
+    val DefaultThrottledReplicationRateLimit = ReplicationQuotaManagerConfig.QuotaBytesPerSecondDefault
+
+    //Documentation
+    val ThrottledReplicationRateLimitDoc = "A long representing the upper bound (bytes/sec)
on replication traffic for replicas enumerated in the " +
+      s"property $ThrottledReplicationRateLimitProp. This property can be only set dynamically
via the config command etc. The minimum value is 1 KB/s."
+
+    //Definitions
+    private val brokerConfigDef = new ConfigDef()
+      //round minimum value down, to make it easier for users.
+      .define(ThrottledReplicationRateLimitProp, LONG, DefaultThrottledReplicationRateLimit,
atLeast(1000), MEDIUM, ThrottledReplicationRateLimitDoc)
+
+    def names = brokerConfigDef.names
+
+    def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props)
+  }
+
+  object Client {
+    //Properties
+    val ProducerByteRateOverrideProp = "producer_byte_rate"
+    val ConsumerByteRateOverrideProp = "consumer_byte_rate"
+
+    //Defaults
+    val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
+    val DefaultConsumerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
+
+    //Documentation
+    val ProducerOverrideDoc = "A rate representing the upper bound (bytes/sec) for producer
traffic."
+    val ConsumerOverrideDoc = "A rate representing the upper bound (bytes/sec) for consumer
traffic."
+
+    //Definitions
+    private val clientConfigs = new ConfigDef()
+      .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
+      .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
+
+    def names = clientConfigs.names
+
+    def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props)
+  }
+
+  private def validate(configDef: ConfigDef, props: Properties) = {
+    //Validate Names
+    val names = configDef.names()
+    props.keys.asScala.foreach { name =>
+      require(names.contains(name), s"Unknown Dynamic Configuration '$name'.")
+    }
+    //ValidateValues
+    configDef.parse(props)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9cd05f1..c30e291 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -103,7 +103,6 @@ object Defaults {
   val NumRecoveryThreadsPerDataDir = 1
   val AutoCreateTopicsEnable = true
   val MinInSyncReplicas = 1
-  val ThrottledReplicationRateLimit = Long.MaxValue
 
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMs = RequestTimeoutMs
@@ -320,7 +319,6 @@ object KafkaConfig {
   val NumReplicationQuotaSamplesProp = "replication.quota.window.num"
   val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
   val ReplicationQuotaWindowSizeSecondsProp = "replication.quota.window.size.seconds"
-  val ThrottledReplicationRateLimitProp = "replication.quota.throttled.rate"
 
   val DeleteTopicEnableProp = "delete.topic.enable"
   val CompressionTypeProp = "compression.type"
@@ -540,8 +538,6 @@ object KafkaConfig {
   val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication
quotas"
   val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas"
   val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication
quotas"
-  val ThrottledReplicationRateLimitDoc = "A long representing the upper bound (bytes/sec)
on replication traffic for replicas enumerated in the " +
-    s"property $ThrottledReplicationRateLimitProp. This property can be only set dynamically
via the config command."
 
   val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will
have no effect if this config is turned off"
   val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration
accepts the standard compression codecs " +
@@ -582,8 +578,6 @@ object KafkaConfig {
   val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC
   val SaslKerberosPrincipalToLocalRulesDoc = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC
 
-  def dynamicBrokerConfigs = Seq(KafkaConfig.ThrottledReplicationRateLimitProp)
-
   private val configDef = {
     import ConfigDef.Importance._
     import ConfigDef.Range._
@@ -731,7 +725,6 @@ object KafkaConfig {
       .define(NumReplicationQuotaSamplesProp, INT, Defaults.NumReplicationQuotaSamples, atLeast(1),
LOW, NumReplicationQuotaSamplesDoc)
       .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1),
LOW, QuotaWindowSizeSecondsDoc)
       .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds,
atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc)
-      .define(ThrottledReplicationRateLimitProp, LONG, Defaults.ThrottledReplicationRateLimit,
atLeast(0), MEDIUM, ThrottledReplicationRateLimitDoc)
 
       /** ********* SSL Configuration ****************/
       .define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index c9b7787..16ee636 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -16,7 +16,7 @@ package kafka.api
 
 import java.util.Properties
 
-import kafka.server.{QuotaConfigOverride, KafkaConfig, KafkaServer, QuotaId}
+import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer, QuotaId}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
@@ -105,8 +105,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   def testProducerConsumerOverrideUnthrottled() {
     // Give effectively unlimited quota for producer and consumer
     val props = new Properties()
-    props.put(QuotaConfigOverride.ProducerOverride, Long.MaxValue.toString)
-    props.put(QuotaConfigOverride.ConsumerOverride, Long.MaxValue.toString)
+    props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, Long.MaxValue.toString)
+    props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, Long.MaxValue.toString)
 
     overrideQuotas(Long.MaxValue, Long.MaxValue)
     waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
@@ -188,8 +188,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
 
   def quotaProperties(producerQuota: Long, consumerQuota: Long): Properties = {
     val props = new Properties()
-    props.put(QuotaConfigOverride.ProducerOverride, producerQuota.toString)
-    props.put(QuotaConfigOverride.ConsumerOverride, consumerQuota.toString)
+    props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+    props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
     props
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index 7477f7f..d71713f 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -17,7 +17,7 @@ package kafka.api
 import java.util.Properties
 
 import kafka.admin.AdminUtils
-import kafka.server.{KafkaConfig, QuotaConfigOverride, QuotaId}
+import kafka.server.{DynamicConfig, KafkaConfig, QuotaId}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Before
 
@@ -36,11 +36,11 @@ class ClientIdQuotaTest extends BaseQuotaTest {
 
   override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
     val producerProps = new Properties()
-    producerProps.put(QuotaConfigOverride.ProducerOverride, producerQuota.toString)
+    producerProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
     updateQuotaOverride(producerClientId, producerProps)
 
     val consumerProps = new Properties()
-    consumerProps.put(QuotaConfigOverride.ConsumerOverride, consumerQuota.toString)
+    consumerProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
     updateQuotaOverride(consumerClientId, consumerProps)
   }
   override def removeQuotaOverrides() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 081fa0b..82b109d 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -19,7 +19,7 @@ import java.util.Properties
 
 import kafka.admin.AdminUtils
 
-import kafka.server.{KafkaConfig, ConfigEntityName, QuotaConfigOverride, QuotaId}
+import kafka.server._
 
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Before
@@ -46,11 +46,11 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
 
   override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
     val producerProps = new Properties()
-    producerProps.setProperty(QuotaConfigOverride.ProducerOverride, producerQuota.toString)
+    producerProps.setProperty(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
     updateQuotaOverride(userPrincipal, producerClientId, producerProps)
 
     val consumerProps = new Properties()
-    consumerProps.setProperty(QuotaConfigOverride.ConsumerOverride, consumerQuota.toString)
+    consumerProps.setProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
     updateQuotaOverride(userPrincipal, consumerClientId, consumerProps)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 892e26b..4452da3 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -16,6 +16,7 @@
  */
 package kafka.admin
 
+import kafka.server.DynamicConfig.Broker._
 import kafka.server.KafkaConfig._
 import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidReplicationFactorException,
InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.metrics.Quota
@@ -30,9 +31,8 @@ import kafka.utils.{Logging, TestUtils, ZkUtils}
 import kafka.common.TopicAndPartition
 import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
 import java.io.File
-
 import kafka.utils.TestUtils._
-
+import kafka.admin.AdminUtils._
 import scala.collection.{Map, immutable}
 
 class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@@ -445,16 +445,10 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
   @Test
   def shouldPropagateDynamicBrokerConfigs() {
     val brokerIds = Seq(0, 1, 2)
-    val servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(TestUtils.createServer(_))
-
-    def wrap(limit: Long): Properties = {
-      val props = new Properties()
-      props.setProperty(KafkaConfig.ThrottledReplicationRateLimitProp, limit.toString)
-      props
-    }
+    val servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(createServer(_))
 
     def checkConfig(limit: Long) {
-      TestUtils.retry(10000) {
+      retry(10000) {
         for (server <- servers) {
           assertEquals("Leader Quota Manager was not updated", limit, server.quotaManagers.leader.upperBound)
           assertEquals("Follower Quota Manager was not updated", limit, server.quotaManagers.follower.upperBound)
@@ -463,26 +457,26 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
     }
 
     try {
-      val limit: Long = 42
+      val limit: Long = 1000000
 
       // Set the limit & check it is applied to the log
-      AdminUtils.changeBrokerConfig(servers(0).zkUtils, brokerIds, wrap(limit))
+      changeBrokerConfig(servers(0).zkUtils, brokerIds,  wrapInProps(ThrottledReplicationRateLimitProp,
limit.toString))
       checkConfig(limit)
 
       // Now double the config values for the topic and check that it is applied
       val newLimit = 2 * limit
-      AdminUtils.changeBrokerConfig(servers(0).zkUtils, brokerIds, wrap(newLimit))
+      changeBrokerConfig(servers(0).zkUtils, brokerIds,  wrapInProps(ThrottledReplicationRateLimitProp,
newLimit.toString))
       checkConfig(newLimit)
 
       // Verify that the same config can be read from ZK
       for (brokerId <- brokerIds) {
         val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, ConfigType.Broker,
brokerId.toString)
-        assertEquals(newLimit, configInZk.getProperty(KafkaConfig.ThrottledReplicationRateLimitProp).toInt)
+        assertEquals(newLimit, configInZk.getProperty(ThrottledReplicationRateLimitProp).toInt)
       }
 
       //Now delete the config
-      AdminUtils.changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
-      checkConfig(kafka.server.Defaults.ThrottledReplicationRateLimit)
+      changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
+      checkConfig(DefaultThrottledReplicationRateLimit)
 
     } finally {
       servers.foreach(_.shutdown())

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 38fca87..b96240c 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -161,9 +161,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     ), servers = servers)
 
     //Given throttle set so replication will take a while
-    val throttle: Long = 100 * 1000
-    produceMessages(servers, "topic1", 10, acks = 0, 100 * 1000)
-    produceMessages(servers, "topic2", 10, acks = 0, 100 * 1000)
+    val throttle: Long = 1000 * 1000
+    produceMessages(servers, "topic1", 100, acks = 0, 100 * 1000)
+    produceMessages(servers, "topic2", 100, acks = 0, 100 * 1000)
 
     //Start rebalance
     val newAssignment = Map(

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
index c6a42e2..e2c0541 100644
--- a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
@@ -13,7 +13,7 @@
 package kafka.admin
 
 import kafka.log.LogConfig
-import kafka.server.{KafkaConfig, ConfigType, KafkaServer}
+import kafka.server.{DynamicConfig, KafkaConfig, ConfigType, KafkaServer}
 import kafka.utils.TestUtils
 
 import scala.collection.Seq
@@ -24,7 +24,7 @@ object ReplicationQuotaUtils {
     TestUtils.waitUntilTrue(() => {
       val brokerReset = servers.forall { server =>
         val brokerConfig = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker,
server.config.brokerId.toString)
-        !brokerConfig.contains(KafkaConfig.ThrottledReplicationRateLimitProp)
+        !brokerConfig.contains(DynamicConfig.Broker.ThrottledReplicationRateLimitProp)
       }
       val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, ConfigType.Topic,
topic)
       val topicReset = !topicConfig.contains(LogConfig.ThrottledReplicasListProp)
@@ -37,7 +37,7 @@ object ReplicationQuotaUtils {
       //Check for limit in ZK
       val brokerConfigAvailable = servers.forall { server =>
         val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker,
server.config.brokerId.toString)
-        val zkThrottleRate = configInZk.getProperty(KafkaConfig.ThrottledReplicationRateLimitProp)
+        val zkThrottleRate = configInZk.getProperty(DynamicConfig.Broker.ThrottledReplicationRateLimitProp)
         zkThrottleRate != null && expectedThrottleRate == zkThrottleRate.toLong
       }
       //Check replicas assigned

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 14faa80..01b1366 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -60,8 +60,9 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     assertTrue("Should contain a ConfigHandler for " + rootEntityType ,
                this.servers.head.dynamicConfigHandlers.contains(rootEntityType))
     val props = new Properties()
-    props.put(QuotaConfigOverride.ProducerOverride, "1000")
-    props.put(QuotaConfigOverride.ConsumerOverride, "2000")
+    props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1000")
+    props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000")
+
     val quotaManagers = servers.head.apis.quotas
     rootEntityType match {
       case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, configEntityName,
props)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
new file mode 100644
index 0000000..7808bba
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -0,0 +1,58 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import kafka.admin.AdminUtils
+import kafka.utils.TestUtils._
+import kafka.utils.ZkUtils
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.config._
+import org.easymock.EasyMock
+import org.junit.{Before, Test}
+
+class DynamicConfigTest {
+  private final val nonExistentConfig: String = "some.config.that.does.not.exist"
+  private final val someValue: String = "some interesting value"
+
+  var zkUtils: ZkUtils = _
+
+  @Before
+  def setUp() {
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldFailWhenChangingBrokerUnknownConfig() {
+    AdminUtils.changeBrokerConfig(zkUtils, Seq(0), wrapInProps(nonExistentConfig, someValue))
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldFailWhenChangingClientIdUnknownConfig() {
+    AdminUtils.changeClientIdConfig(zkUtils, "ClientId", wrapInProps(nonExistentConfig, someValue))
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldFailWhenChangingUserUnknownConfig() {
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "UserId", wrapInProps(nonExistentConfig,
someValue))
+  }
+
+  @Test(expected = classOf[ConfigException])
+  def shouldFailConfigsWithInvalidValues() {
+    AdminUtils.changeBrokerConfig(zkUtils, Seq(0), wrapInProps(DynamicConfig.Broker.ThrottledReplicationRateLimitProp,
"-100"))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 88b9b89..e2739a0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -82,7 +82,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
       * regular replication works as expected.
       */
 
-    brokers = (100 to 105).map { id => TestUtils.createServer(fromProps(createBrokerConfig(id,
zkConnect))) }
+    brokers = (100 to 105).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect)))
}
 
     //Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on node 6,7 (not
started yet)
     //And two extra partitions 6,7, which we don't intend on throttling.
@@ -105,15 +105,15 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Set the throttle limit on all 8 brokers, but only assign throttled replicas to the
six leaders, or two followers
     (100 to 107).foreach { brokerId =>
-      changeBrokerConfig(zkUtils, Seq(brokerId), property(KafkaConfig.ThrottledReplicationRateLimitProp,
throttle.toString))
+      changeBrokerConfig(zkUtils, Seq(brokerId), wrapInProps(DynamicConfig.Broker.ThrottledReplicationRateLimitProp,
throttle.toString))
     }
 
     //Either throttle the six leaders or the two followers
     val throttledReplicas = if (leaderThrottle) "0:100,1:101,2:102,3:103,4:104,5:105" else
"0:106,1:106,2:106,3:107,4:107,5:107"
-    changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, throttledReplicas))
+    changeTopicConfig(zkUtils, topic, wrapInProps(ThrottledReplicasListProp, throttledReplicas))
 
     //Add data equally to each partition
-    producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers),
retries = 5, acks = 0)
+    producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks
= 0)
     (0 until msgCount).foreach { x =>
       (0 to 7).foreach { partition =>
         producer.send(new ProducerRecord(topic, partition, null, msg))
@@ -177,7 +177,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     //2 brokers with 1MB Segment Size & 1 partition
     val config: Properties = createBrokerConfig(100, zkConnect)
     config.put("log.segment.bytes", (1024 * 1024).toString)
-    brokers = Seq(TestUtils.createServer(fromProps(config)))
+    brokers = Seq(createServer(fromProps(config)))
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 ->
Seq(100, 101)))
 
     //Write 20MBs and throttle at 5MB/s
@@ -187,8 +187,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val throttle: Long = msg.length * msgCount / expectedDuration
 
     //Set the throttle limit leader
-    changeBrokerConfig(zkUtils, Seq(100), property(KafkaConfig.ThrottledReplicationRateLimitProp,
throttle.toString))
-    changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, "0:100"))
+    changeBrokerConfig(zkUtils, Seq(100), wrapInProps(DynamicConfig.Broker.ThrottledReplicationRateLimitProp,
throttle.toString))
+    changeTopicConfig(zkUtils, topic, wrapInProps(ThrottledReplicasListProp, "0:100"))
 
     //Add data
     addData(msgCount, msg)
@@ -196,7 +196,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val start = System.currentTimeMillis()
 
     //Start the new broker (and hence start replicating)
-    brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(101, zkConnect)))
+    brokers = brokers :+ createServer(fromProps(createBrokerConfig(101, zkConnect)))
     waitForOffsetsToMatch(msgCount, 0, 101)
 
     val throttledTook = System.currentTimeMillis() - start
@@ -208,7 +208,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
   }
 
   def addData(msgCount: Int, msg: Array[Byte]): Boolean = {
-    producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers),
retries = 5, acks = 0)
+    producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks
= 0)
     (0 until msgCount).foreach { x => producer.send(new ProducerRecord(topic, msg)).get
}
     waitForOffsetsToMatch(msgCount, 0, 100)
   }
@@ -220,17 +220,11 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     }, s"Offsets did not match for partition $partitionId on broker $brokerId", 60000)
   }
 
-  private def property(key: String, value: String) = {
-    val props = new Properties()
-    props.put(key, value)
-    props
-  }
-
   private def brokerFor(id: Int): KafkaServer = brokers.filter(_.config.brokerId == id).head
 
   def createBrokers(brokerIds: Seq[Int]): Unit = {
     brokerIds.foreach { id =>
-      brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(id, zkConnect)))
+      brokers = brokers :+ createServer(fromProps(createBrokerConfig(id, zkConnect)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ed4a51/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4e1d278..29ce693 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1126,6 +1126,12 @@ object TestUtils extends Logging {
 
   }
 
+  def wrapInProps(key: String, value: String): Properties = {
+    val props: Properties = new Properties()
+    props.put(key, value)
+    props
+  }
+
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {


Mime
View raw message