kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-4158; Reset quota to default value if quota override is deleted
Date Wed, 14 Sep 2016 00:34:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2a660f13d -> 41e676d29


KAFKA-4158; Reset quota to default value if quota override is deleted

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Jiangjie Qin <becket.qin@gmail.com>

Closes #1851 from lindong28/KAFKA-4158


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41e676d2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41e676d2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41e676d2

Branch: refs/heads/trunk
Commit: 41e676d29587042994a72baa5000a8861a075c8c
Parents: 2a660f1
Author: Dong Lin <lindong28@gmail.com>
Authored: Tue Sep 13 17:33:54 2016 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Tue Sep 13 17:33:54 2016 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/ClientQuotaManager.scala  |  8 ++++++++
 .../main/scala/kafka/server/ConfigHandler.scala  |  4 ++++
 .../kafka/server/DynamicConfigChangeTest.scala   | 19 +++++++++++++++++--
 3 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/41e676d2/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c99ba97..5e90080 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -243,6 +243,14 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
+   * Reset quotas to the default value for the given clientId
+   * @param clientId client to override
+   */
+  def resetQuota(clientId: String) = {
+    updateQuota(clientId, defaultQuota)
+  }
+
+  /**
    * Overrides quotas per clientId
    * @param clientId client to override
    * @param quota custom quota to apply

http://git-wip-us.apache.org/repos/asf/kafka/blob/41e676d2/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index ab1d782..d07fdd8 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -82,11 +82,15 @@ class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaMan
     if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
       quotaManagers(ApiKeys.PRODUCE.id).updateQuota(clientId,
         new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong,
true))
+    } else {
+      quotaManagers(ApiKeys.PRODUCE.id).resetQuota(clientId)
     }
 
     if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
       quotaManagers(ApiKeys.FETCH.id).updateQuota(clientId,
         new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong,
true))
+    } else {
+      quotaManagers(ApiKeys.FETCH.id).resetQuota(clientId)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/41e676d2/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index af979e4..a9df929 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -65,10 +65,9 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     props.put(ClientConfigOverride.ProducerOverride, "1000")
     props.put(ClientConfigOverride.ConsumerOverride, "2000")
     AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
+    val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers
 
     TestUtils.retry(10000) {
-      val configHandler = this.servers.head.dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
-      val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers
       val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
       val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
 
@@ -77,6 +76,22 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
         assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000",
         Quota.upperBound(2000), overrideConsumerQuota)
     }
+
+    val defaultProducerQuota = servers.head.apis.config.producerQuotaBytesPerSecondDefault.doubleValue
+    val defaultConsumerQuota = servers.head.apis.config.consumerQuotaBytesPerSecondDefault.doubleValue
+    assertNotEquals("defaultProducerQuota should be different from 1000", 1000, defaultProducerQuota)
+    assertNotEquals("defaultConsumerQuota should be different from 2000", 2000, defaultConsumerQuota)
+    AdminUtils.changeClientIdConfig(zkUtils, clientId, new Properties())
+
+    TestUtils.retry(10000) {
+      val producerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
+      val consumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
+
+      assertEquals(s"ClientId $clientId must have reset producer quota to " + defaultProducerQuota,
+        Quota.upperBound(defaultProducerQuota), producerQuota)
+      assertEquals(s"ClientId $clientId must have reset consumer quota to " + defaultConsumerQuota,
+        Quota.upperBound(defaultConsumerQuota), consumerQuota)
+    }
   }
 
   @Test


Mime
View raw message