kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Rename Throttling config variables to match config name
Date Mon, 03 Oct 2016 20:15:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 49e4f85d3 -> e62e3b745


MINOR: Rename Throttling config variables to match config name

ThrottledLeaderReplicationRate* => LeaderReplicationThrottledRate*
ThrottledFollowerReplicationRate* => FollowerReplicationThrottledRate* LeaderThrottledReplicasList*
=> LeaderReplicationThrottledReplicas*
FollowerThrottledReplicasList* => FollowerReplicationThrottledReplicas*

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1951 from benstopford/rename-ThrottledLeaderReplicationRateProp

(cherry picked from commit 496594a12c67d468c5ab58d83bf31caea3154e56)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e62e3b74
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e62e3b74
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e62e3b74

Branch: refs/heads/0.10.1
Commit: e62e3b745141d5fd6b461634f3768ef8a42d2eaa
Parents: 49e4f85
Author: Ben Stopford <benstopford@gmail.com>
Authored: Mon Oct 3 18:21:57 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon Oct 3 21:15:21 2016 +0100

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala | 16 +++++------
 core/src/main/scala/kafka/log/LogConfig.scala   | 24 ++++++++---------
 .../main/scala/kafka/server/ConfigHandler.scala | 10 +++----
 .../main/scala/kafka/server/DynamicConfig.scala | 18 ++++++-------
 .../test/scala/unit/kafka/admin/AdminTest.scala | 24 ++++++++---------
 .../admin/ReassignPartitionsCommandTest.scala   | 28 ++++++++++----------
 .../kafka/admin/ReplicationQuotaUtils.scala     | 16 +++++------
 .../kafka/server/DynamicConfigChangeTest.scala  | 16 +++++------
 .../unit/kafka/server/DynamicConfigTest.scala   |  4 +--
 .../kafka/server/ReplicationQuotasTest.scala    | 12 ++++-----
 10 files changed, 84 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e62e3b74/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 06e4120..a037fd4 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -84,8 +84,8 @@ object ReassignPartitionsCommand extends Logging {
       for (brokerId <- zkUtils.getAllBrokersInCluster().map(_.id)) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, brokerId.toString)
         // bitwise OR as we don't want to short-circuit
-        if (configs.remove(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp) != null
-          | configs.remove(DynamicConfig.Broker.ThrottledFollowerReplicationRateProp) !=
null){
+        if (configs.remove(DynamicConfig.Broker.LeaderReplicationThrottledRateProp) != null
+          | configs.remove(DynamicConfig.Broker.FollowerReplicationThrottledRateProp) !=
null){
           AdminUtils.changeBrokerConfig(zkUtils, Seq(brokerId), configs)
           changed = true
         }
@@ -96,8 +96,8 @@ object ReassignPartitionsCommand extends Logging {
       for (topic <- topics) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
         // bitwise OR as we don't want to short-circuit
-        if (configs.remove(LogConfig.LeaderThrottledReplicasListProp) != null
-          | configs.remove(LogConfig.FollowerThrottledReplicasListProp) != null){
+        if (configs.remove(LogConfig.LeaderReplicationThrottledReplicasProp) != null
+          | configs.remove(LogConfig.FollowerReplicationThrottledReplicasProp) != null){
           AdminUtils.changeTopicConfig(zkUtils, topic, configs)
           changed = true
         }
@@ -330,8 +330,8 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment:
Map[TopicA
 
       for (id <- brokers) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, id.toString)
-        configs.put(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp, throttle.toString)
-        configs.put(DynamicConfig.Broker.ThrottledFollowerReplicationRateProp, throttle.toString)
+        configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString)
+        configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.toString)
         AdminUtils.changeBrokerConfig(zkUtils, Seq(id), configs)
       }
       println(s"The throttle limit was set to $throttle B/s")
@@ -350,8 +350,8 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment:
Map[TopicA
       val follower = format(postRebalanceReplicasThatMoved(existing, proposed))
 
       val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
-      configs.put(LeaderThrottledReplicasListProp, leader)
-      configs.put(FollowerThrottledReplicasListProp, follower)
+      configs.put(LeaderReplicationThrottledReplicasProp, leader)
+      configs.put(FollowerReplicationThrottledReplicasProp, follower)
       admin.changeTopicConfig(zkUtils, topic, configs)
 
       debug(s"Updated leader-throttled replicas for topic $topic with: $leader")

http://git-wip-us.apache.org/repos/asf/kafka/blob/e62e3b74/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index a934fcd..d5b5884 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -54,8 +54,8 @@ object Defaults {
   val MessageFormatVersion = kafka.server.Defaults.LogMessageFormatVersion
   val MessageTimestampType = kafka.server.Defaults.LogMessageTimestampType
   val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.LogMessageTimestampDifferenceMaxMs
-  val LeaderThrottledReplicasList = Collections.emptyList[String]()
-  val FollowerThrottledReplicasList = Collections.emptyList[String]()
+  val LeaderReplicationThrottledReplicas = Collections.emptyList[String]()
+  val FollowerReplicationThrottledReplicas = Collections.emptyList[String]()
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef,
props, false) {
@@ -86,8 +86,8 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
   val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp))
   val messageTimestampType = TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp))
   val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue
-  val leaderThrottledReplicasList = getList(LogConfig.LeaderThrottledReplicasListProp)
-  val followerThrottledReplicasList = getList(LogConfig.FollowerThrottledReplicasListProp)
+  val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp)
+  val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp)
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs,
segmentMs)
@@ -124,8 +124,8 @@ object LogConfig {
   val MessageFormatVersionProp = "message.format.version"
   val MessageTimestampTypeProp = "message.timestamp.type"
   val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms"
-  val LeaderThrottledReplicasListProp = "leader.replication.throttled.replicas"
-  val FollowerThrottledReplicasListProp = "follower.replication.throttled.replicas"
+  val LeaderReplicationThrottledReplicasProp = "leader.replication.throttled.replicas"
+  val FollowerReplicationThrottledReplicasProp = "follower.replication.throttled.replicas"
 
   val SegmentSizeDoc = "This configuration controls the segment file size for " +
     "the log. Retention and cleaning is always done a file at a time so a larger " +
@@ -196,9 +196,9 @@ object LogConfig {
   val MessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp
when a broker receives " +
     "a message and the timestamp specified in the message. If message.timestamp.type=CreateTime,
a message will be rejected " +
     "if the difference in timestamp exceeds this threshold. This configuration is ignored
if message.timestamp.type=LogAppendTime."
-  val LeaderThrottledReplicasListDoc = "A list of replicas for which log replication should
be throttled on the leader side. The list should describe a set of " +
+  val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which log replication
should be throttled on the leader side. The list should describe a set of " +
     "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively
the wildcard '*' can be used to throttle all replicas for this topic."
-  val FollowerThrottledReplicasListDoc = "A list of replicas for which log replication should
be throttled on the follower side. The list should describe a set of " +
+  val FollowerReplicationThrottledReplicasDoc = "A list of replicas for which log replication
should be throttled on the follower side. The list should describe a set of " +
     "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively
the wildcard '*' can be used to throttle all replicas for this topic."
 
   private class LogConfigDef extends ConfigDef {
@@ -289,10 +289,10 @@ object LogConfig {
         KafkaConfig.LogMessageTimestampTypeProp)
       .define(MessageTimestampDifferenceMaxMsProp, LONG, Defaults.MessageTimestampDifferenceMaxMs,
         atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc, KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
-      .define(LeaderThrottledReplicasListProp, LIST, Defaults.LeaderThrottledReplicasList,
ThrottledReplicaListValidator, MEDIUM,
-        LeaderThrottledReplicasListDoc, LeaderThrottledReplicasListProp)
-      .define(FollowerThrottledReplicasListProp, LIST, Defaults.FollowerThrottledReplicasList,
ThrottledReplicaListValidator, MEDIUM,
-        FollowerThrottledReplicasListDoc, FollowerThrottledReplicasListProp)
+      .define(LeaderReplicationThrottledReplicasProp, LIST, Defaults.LeaderReplicationThrottledReplicas,
ThrottledReplicaListValidator, MEDIUM,
+        LeaderReplicationThrottledReplicasDoc, LeaderReplicationThrottledReplicasProp)
+      .define(FollowerReplicationThrottledReplicasProp, LIST, Defaults.FollowerReplicationThrottledReplicas,
ThrottledReplicaListValidator, MEDIUM,
+        FollowerReplicationThrottledReplicasDoc, FollowerReplicationThrottledReplicasProp)
   }
 
   def apply(): LogConfig = LogConfig(new Properties())

http://git-wip-us.apache.org/repos/asf/kafka/blob/e62e3b74/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 82e65a2..dc21f83 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -76,8 +76,8 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig:
KafkaC
         logger.debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
       }
     }
-    updateThrottledList(LogConfig.LeaderThrottledReplicasListProp, quotas.leader)
-    updateThrottledList(LogConfig.FollowerThrottledReplicasListProp, quotas.follower)
+    updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader)
+    updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp, quotas.follower)
   }
 
   def parseThrottledPartitions(topicConfig: Properties, brokerId: Int, prop: String): Seq[Int]
= {
@@ -159,11 +159,11 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private
val quo
       if (properties.containsKey(prop))
         properties.getProperty(prop).toLong
       else
-        DefaultThrottledReplicationRate
+        DefaultReplicationThrottledRate
     }
     if (brokerConfig.brokerId == brokerId.trim.toInt) {
-      quotaManagers.leader.updateQuota(upperBound(getOrDefault(ThrottledLeaderReplicationRateProp)))
-      quotaManagers.follower.updateQuota(upperBound(getOrDefault(ThrottledFollowerReplicationRateProp)))
+      quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp)))
+      quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp)))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e62e3b74/core/src/main/scala/kafka/server/DynamicConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 8a62af8..4a9d0a9 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -33,25 +33,25 @@ object DynamicConfig {
 
   object Broker {
     //Properties
-    val ThrottledLeaderReplicationRateProp = "leader.replication.throttled.rate"
-    val ThrottledFollowerReplicationRateProp = "follower.replication.throttled.rate"
+    val LeaderReplicationThrottledRateProp = "leader.replication.throttled.rate"
+    val FollowerReplicationThrottledRateProp = "follower.replication.throttled.rate"
 
     //Defaults
-    val DefaultThrottledReplicationRate = ReplicationQuotaManagerConfig.QuotaBytesPerSecondDefault
+    val DefaultReplicationThrottledRate = ReplicationQuotaManagerConfig.QuotaBytesPerSecondDefault
 
     //Documentation
-    val ThrottledLeaderReplicationRateDoc = "A long representing the upper bound (bytes/sec)
on replication traffic for leaders enumerated in the " +
-      s"property ${LogConfig.LeaderThrottledReplicasListProp} (for each topic). This property
can be only set dynamically. It is suggested that the " +
+    val LeaderReplicationThrottledRateDoc = "A long representing the upper bound (bytes/sec)
on replication traffic for leaders enumerated in the " +
+      s"property ${LogConfig.LeaderReplicationThrottledReplicasProp} (for each topic). This
property can be only set dynamically. It is suggested that the " +
       s"limit be kept above 1MB/s for accurate behaviour."
-    val ThrottledFollowerReplicationRateDoc = "A long representing the upper bound (bytes/sec)
on replication traffic for followers enumerated in the " +
-      s"property ${LogConfig.FollowerThrottledReplicasListProp} (for each topic). This property
can be only set dynamically. It is suggested that the " +
+    val FollowerReplicationThrottledRateDoc = "A long representing the upper bound (bytes/sec)
on replication traffic for followers enumerated in the " +
+      s"property ${LogConfig.FollowerReplicationThrottledReplicasProp} (for each topic).
This property can be only set dynamically. It is suggested that the " +
       s"limit be kept above 1MB/s for accurate behaviour."
 
     //Definitions
     private val brokerConfigDef = new ConfigDef()
       //round minimum value down, to make it easier for users.
-      .define(ThrottledLeaderReplicationRateProp, LONG, DefaultThrottledReplicationRate,
atLeast(0), MEDIUM, ThrottledLeaderReplicationRateDoc)
-      .define(ThrottledFollowerReplicationRateProp, LONG, DefaultThrottledReplicationRate,
atLeast(0), MEDIUM, ThrottledFollowerReplicationRateDoc)
+      .define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate,
atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc)
+      .define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate,
atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc)
 
     def names = brokerConfigDef.names
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e62e3b74/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index ee980e2..693c758 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -392,8 +392,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
       val props = new Properties()
       props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
       props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString)
-      props.setProperty(LogConfig.LeaderThrottledReplicasListProp, throttledLeaders)
-      props.setProperty(LogConfig.FollowerThrottledReplicasListProp, throttledFollowers)
+      props.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, throttledLeaders)
+      props.setProperty(LogConfig.FollowerReplicationThrottledReplicasProp, throttledFollowers)
       props
     }
 
@@ -411,8 +411,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
           assertTrue(log.isDefined)
           assertEquals(retentionMs, log.get.config.retentionMs)
           assertEquals(messageSize, log.get.config.maxMessageSize)
-          checkList(log.get.config.leaderThrottledReplicasList, throttledLeaders)
-          checkList(log.get.config.followerThrottledReplicasList, throttledFollowers)
+          checkList(log.get.config.LeaderReplicationThrottledReplicas, throttledLeaders)
+          checkList(log.get.config.FollowerReplicationThrottledReplicas, throttledFollowers)
           assertEquals(quotaManagerIsThrottled, server.quotaManagers.leader.isThrottled(TopicAndPartition(topic,
part)))
         }
       }
@@ -450,7 +450,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
       checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", quotaManagerIsThrottled
= true)
 
       //Now ensure updating to "" removes the throttled replica list also
-      AdminUtils.changeTopicConfig(server.zkUtils, topic, propsWith((LogConfig.FollowerThrottledReplicasListProp,
""), (LogConfig.LeaderThrottledReplicasListProp, "")))
+      AdminUtils.changeTopicConfig(server.zkUtils, topic, propsWith((LogConfig.FollowerReplicationThrottledReplicasProp,
""), (LogConfig.LeaderReplicationThrottledReplicasProp, "")))
       checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",  quotaManagerIsThrottled
= false)
 
     } finally {
@@ -478,27 +478,27 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
 
       // Set the limit & check it is applied to the log
       changeBrokerConfig(zkUtils, brokerIds, propsWith(
-        (ThrottledLeaderReplicationRateProp, limit.toString),
-        (ThrottledFollowerReplicationRateProp, limit.toString)))
+        (LeaderReplicationThrottledRateProp, limit.toString),
+        (FollowerReplicationThrottledRateProp, limit.toString)))
       checkConfig(limit)
 
       // Now double the config values for the topic and check that it is applied
       val newLimit = 2 * limit
       changeBrokerConfig(zkUtils, brokerIds,  propsWith(
-        (ThrottledLeaderReplicationRateProp, newLimit.toString),
-        (ThrottledFollowerReplicationRateProp, newLimit.toString)))
+        (LeaderReplicationThrottledRateProp, newLimit.toString),
+        (FollowerReplicationThrottledRateProp, newLimit.toString)))
       checkConfig(newLimit)
 
       // Verify that the same config can be read from ZK
       for (brokerId <- brokerIds) {
         val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, ConfigType.Broker,
brokerId.toString)
-        assertEquals(newLimit, configInZk.getProperty(ThrottledLeaderReplicationRateProp).toInt)
-        assertEquals(newLimit, configInZk.getProperty(ThrottledFollowerReplicationRateProp).toInt)
+        assertEquals(newLimit, configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
+        assertEquals(newLimit, configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
       }
 
       //Now delete the config
       changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
-      checkConfig(DefaultThrottledReplicationRate)
+      checkConfig(DefaultReplicationThrottledRate)
 
     } finally {
       servers.foreach(_.shutdown())

http://git-wip-us.apache.org/repos/asf/kafka/blob/e62e3b74/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 0e96795..2a3724e 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -66,8 +66,8 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging
wi
 
     val mock = new TestAdminUtils {
       override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
-        assertEquals("0:102", configChange.get(FollowerThrottledReplicasListProp)) //Should
only be follower-throttle the moving replica
-        assertEquals("0:100,0:101", configChange.get(LeaderThrottledReplicasListProp)) //Should
leader-throttle all existing (pre move) replicas
+        assertEquals("0:102", configChange.get(FollowerReplicationThrottledReplicasProp))
//Should only be follower-throttle the moving replica
+        assertEquals("0:100,0:101", configChange.get(LeaderReplicationThrottledReplicasProp))
//Should leader-throttle all existing (pre move) replicas
         calls += 1
       }
     }
@@ -88,8 +88,8 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging
wi
     // Then
     val mock = new TestAdminUtils {
       override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
-        assertEquals("0:102,1:102", configChange.get(FollowerThrottledReplicasListProp))
//Should only be follower-throttle the moving replica
-        assertEquals("0:100,0:101,1:100,1:101", configChange.get(LeaderThrottledReplicasListProp))
//Should leader-throttle all existing (pre move) replicas
+        assertEquals("0:102,1:102", configChange.get(FollowerReplicationThrottledReplicasProp))
//Should only be follower-throttle the moving replica
+        assertEquals("0:100,0:101,1:100,1:101", configChange.get(LeaderReplicationThrottledReplicasProp))
//Should leader-throttle all existing (pre move) replicas
         calls += 1
       }
     }
@@ -113,11 +113,11 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
       override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
         topic match {
           case "topic1" =>
-            assertEquals("0:100,0:101", configChange.get(LeaderThrottledReplicasListProp))
-            assertEquals("0:102", configChange.get(FollowerThrottledReplicasListProp))
+            assertEquals("0:100,0:101", configChange.get(LeaderReplicationThrottledReplicasProp))
+            assertEquals("0:102", configChange.get(FollowerReplicationThrottledReplicasProp))
           case "topic2" =>
-            assertEquals("0:101,0:102", configChange.get(LeaderThrottledReplicasListProp))
-            assertEquals("0:100", configChange.get(FollowerThrottledReplicasListProp))
+            assertEquals("0:101,0:102", configChange.get(LeaderReplicationThrottledReplicasProp))
+            assertEquals("0:100", configChange.get(FollowerReplicationThrottledReplicasProp))
           case _ => fail("Unexpected topic $topic")
         }
         calls += 1
@@ -152,11 +152,11 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
       override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
         topic match {
           case "topic1" =>
-            assertEquals("0:102,1:102", configChange.get(FollowerThrottledReplicasListProp))
-            assertEquals("0:100,0:101,1:100,1:101", configChange.get(LeaderThrottledReplicasListProp))
+            assertEquals("0:102,1:102", configChange.get(FollowerReplicationThrottledReplicasProp))
+            assertEquals("0:100,0:101,1:100,1:101", configChange.get(LeaderReplicationThrottledReplicasProp))
           case "topic2" =>
-            assertEquals("0:100,1:100", configChange.get(FollowerThrottledReplicasListProp))
-            assertEquals("0:101,0:102,1:101,1:102", configChange.get(LeaderThrottledReplicasListProp))
+            assertEquals("0:100,1:100", configChange.get(FollowerReplicationThrottledReplicasProp))
+            assertEquals("0:101,0:102,1:101,1:102", configChange.get(LeaderReplicationThrottledReplicasProp))
           case _ => fail()
         }
         calls += 1
@@ -180,8 +180,8 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
     // Then
     val mock = new TestAdminUtils {
       override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties)
= {
-        assertEquals("0:104,0:105", configChange.get(FollowerThrottledReplicasListProp))
//Should only be follower-throttle the moving replicas
-        assertEquals("0:100,0:101,0:102,0:103", configChange.get(LeaderThrottledReplicasListProp))
//Should leader-throttle all existing (pre move) replicas
+        assertEquals("0:104,0:105", configChange.get(FollowerReplicationThrottledReplicasProp))
//Should only be follower-throttle the moving replicas
+        assertEquals("0:100,0:101,0:102,0:103", configChange.get(LeaderReplicationThrottledReplicasProp))
//Should leader-throttle all existing (pre move) replicas
         calls += 1
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e62e3b74/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
index dc50f61..fff4ea1 100644
--- a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
@@ -24,12 +24,12 @@ object ReplicationQuotaUtils {
     TestUtils.waitUntilTrue(() => {
       val hasRateProp = servers.forall { server =>
         val brokerConfig = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker,
server.config.brokerId.toString)
-        brokerConfig.contains(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp) ||
-          brokerConfig.contains(DynamicConfig.Broker.ThrottledFollowerReplicationRateProp)
+        brokerConfig.contains(DynamicConfig.Broker.LeaderReplicationThrottledRateProp) ||
+          brokerConfig.contains(DynamicConfig.Broker.FollowerReplicationThrottledRateProp)
       }
       val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, ConfigType.Topic,
topic)
-      val hasReplicasProp = topicConfig.contains(LogConfig.LeaderThrottledReplicasListProp)
||
-        topicConfig.contains(LogConfig.FollowerThrottledReplicasListProp)
+      val hasReplicasProp = topicConfig.contains(LogConfig.LeaderReplicationThrottledReplicasProp)
||
+        topicConfig.contains(LogConfig.FollowerReplicationThrottledReplicasProp)
       !hasRateProp && !hasReplicasProp
     }, "Throttle limit/replicas was not unset")
   }
@@ -39,15 +39,15 @@ object ReplicationQuotaUtils {
       //Check for limit in ZK
       val brokerConfigAvailable = servers.forall { server =>
         val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker,
server.config.brokerId.toString)
-        val zkLeaderRate = configInZk.getProperty(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp)
-        val zkFollowerRate = configInZk.getProperty(DynamicConfig.Broker.ThrottledFollowerReplicationRateProp)
+        val zkLeaderRate = configInZk.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp)
+        val zkFollowerRate = configInZk.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp)
         zkLeaderRate != null && expectedThrottleRate == zkLeaderRate.toLong &&
           zkFollowerRate != null && expectedThrottleRate == zkFollowerRate.toLong
       }
       //Check replicas assigned
       val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, ConfigType.Topic,
topic)
-      val leader = topicConfig.getProperty(LogConfig.LeaderThrottledReplicasListProp)
-      val follower = topicConfig.getProperty(LogConfig.FollowerThrottledReplicasListProp)
+      val leader = topicConfig.getProperty(LogConfig.LeaderReplicationThrottledReplicasProp)
+      val follower = topicConfig.getProperty(LogConfig.FollowerReplicationThrottledReplicasProp)
       val topicConfigAvailable = (leader == throttledLeaders && follower == throttledFollowers)
       brokerConfigAvailable && topicConfigAvailable
     }, "throttle limit/replicas was not set")

http://git-wip-us.apache.org/repos/asf/kafka/blob/e62e3b74/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index cf3413a..4b44b1f 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -203,11 +203,11 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     val props: Properties = new Properties()
 
     //Given
-    props.put(LeaderThrottledReplicasListProp, "0:101,0:102,1:101,1:102")
+    props.put(LeaderReplicationThrottledReplicasProp, "0:101,0:102,1:101,1:102")
 
     //When/Then
-    assertEquals(Seq(0,1), configHandler.parseThrottledPartitions(props, 102, LeaderThrottledReplicasListProp))
-    assertEquals(Seq(), configHandler.parseThrottledPartitions(props, 103, LeaderThrottledReplicasListProp))
+    assertEquals(Seq(0,1), configHandler.parseThrottledPartitions(props, 102, LeaderReplicationThrottledReplicasProp))
+    assertEquals(Seq(), configHandler.parseThrottledPartitions(props, 103, LeaderReplicationThrottledReplicasProp))
   }
 
   @Test
@@ -216,10 +216,10 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     val props: Properties = new Properties()
 
     //Given
-    props.put(LeaderThrottledReplicasListProp, "*")
+    props.put(LeaderReplicationThrottledReplicasProp, "*")
 
     //When
-    val result = configHandler.parseThrottledPartitions(props, 102, LeaderThrottledReplicasListProp)
+    val result = configHandler.parseThrottledPartitions(props, 102, LeaderReplicationThrottledReplicasProp)
 
     //Then
     assertEquals(AllReplicas, result)
@@ -231,10 +231,10 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     val props: Properties = new Properties()
 
     //Given
-    props.put(FollowerThrottledReplicasListProp, "")
+    props.put(FollowerReplicationThrottledReplicasProp, "")
 
     //When
-    val result = configHandler.parseThrottledPartitions(props, 102, FollowerThrottledReplicasListProp)
+    val result = configHandler.parseThrottledPartitions(props, 102, FollowerReplicationThrottledReplicasProp)
 
     //Then
     assertEquals(Seq(), result)
@@ -251,6 +251,6 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
   }
 
   def parse(configHandler: TopicConfigHandler, value: String): Seq[Int] = {
-    configHandler.parseThrottledPartitions(CoreUtils.propsWith(LeaderThrottledReplicasListProp,
value), 102, LeaderThrottledReplicasListProp)
+    configHandler.parseThrottledPartitions(CoreUtils.propsWith(LeaderReplicationThrottledReplicasProp,
value), 102, LeaderReplicationThrottledReplicasProp)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e62e3b74/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index c481dc7..9e6b1b2 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -54,12 +54,12 @@ class DynamicConfigTest {
   @Test(expected = classOf[ConfigException])
   def shouldFailLeaderConfigsWithInvalidValues() {
     AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
-      propsWith(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp, "-100"))
+      propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "-100"))
   }
 
   @Test(expected = classOf[ConfigException])
   def shouldFailFollowerConfigsWithInvalidValues() {
     AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
-      propsWith(DynamicConfig.Broker.ThrottledFollowerReplicationRateProp, "-100"))
+      propsWith(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "-100"))
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e62e3b74/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 9107067..511052d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -107,16 +107,16 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     (100 to 107).foreach { brokerId =>
       changeBrokerConfig(zkUtils, Seq(brokerId),
         propsWith(
-          (DynamicConfig.Broker.ThrottledLeaderReplicationRateProp, throttle.toString),
-          (DynamicConfig.Broker.ThrottledFollowerReplicationRateProp, throttle.toString)
+          (DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString),
+          (DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.toString)
         ))
     }
 
     //Either throttle the six leaders or the two followers
     if (leaderThrottle)
-      changeTopicConfig(zkUtils, topic, propsWith(LeaderThrottledReplicasListProp, "0:100,1:101,2:102,3:103,4:104,5:105"
))
+      changeTopicConfig(zkUtils, topic, propsWith(LeaderReplicationThrottledReplicasProp,
"0:100,1:101,2:102,3:103,4:104,5:105" ))
     else
-      changeTopicConfig(zkUtils, topic, propsWith(FollowerThrottledReplicasListProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
+      changeTopicConfig(zkUtils, topic, propsWith(FollowerReplicationThrottledReplicasProp,
"0:106,1:106,2:106,3:107,4:107,5:107"))
 
     //Add data equally to each partition
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks
= 0)
@@ -193,8 +193,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val throttle: Long = msg.length * msgCount / expectedDuration
 
     //Set the throttle to only limit leader
-    changeBrokerConfig(zkUtils, Seq(100), propsWith(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp,
throttle.toString))
-    changeTopicConfig(zkUtils, topic, propsWith(LeaderThrottledReplicasListProp, "0:100"))
+    changeBrokerConfig(zkUtils, Seq(100), propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
throttle.toString))
+    changeTopicConfig(zkUtils, topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100"))
 
     //Add data
     addData(msgCount, msg)


Mime
View raw message