kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3310; Fix for NPEs observed when throttling clients.
Date Fri, 04 Mar 2016 00:17:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f25fe02d9 -> 5d8936544


KAFKA-3310; Fix for NPEs observed when throttling clients.

The fix basically ensures that the throttleTimeSensor is non-null before handing off to record
the metric value. We also record the throttle time to 0 so that we don't recreate the sensor
always.

Author: Aditya Auradkar <aauradkar@linkedin.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Jun Rao <junrao@gmail.com>

Closes #989 from auradkar/KAFKA-3310


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d893654
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d893654
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d893654

Branch: refs/heads/trunk
Commit: 5d893654489647e8be65c4d54864ab63b7285faa
Parents: f25fe02
Author: Aditya Auradkar <aauradkar@linkedin.com>
Authored: Thu Mar 3 16:16:56 2016 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Mar 3 16:16:56 2016 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/ClientQuotaManager.scala | 11 +++--
 .../kafka/server/ClientQuotaManagerTest.scala   | 52 ++++++++++++++++++--
 2 files changed, 54 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d893654/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5ec57ce..5863c72 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -120,9 +120,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
         val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
         throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId)))
         clientSensors.throttleTimeSensor.record(throttleTimeMs)
+        // If delayed, add the element to the delayQueue
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
         delayQueueSensor.record()
-        // If delayed, add the element to the delayQueue
         logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(),
throttleTimeMs))
     }
     throttleTimeMs
@@ -189,9 +189,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     }
 
     /* If the sensor is null, try to create it else return the created sensor
-     * Also if quota sensor is null, the throttle time sensor must be null
+     * Either of the sensors can be null, hence null checks on both
      */
-    if (quotaSensor == null) {
+    if (quotaSensor == null || throttleTimeSensor == null) {
       /* Acquire a write lock because the sensor may not have been created and we only want
one thread to create it.
        * Note that multiple threads may acquire the write lock if they all see a null sensor
initially
        * In this case, the writer checks the sensor after acquiring the lock again.
@@ -204,7 +204,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
         // ensure that we initialise `ClientSensors` with non-null parameters.
         quotaSensor = metrics.getSensor(quotaSensorName)
         throttleTimeSensor = metrics.getSensor(throttleTimeSensorName)
-        if (quotaSensor == null) {
+        if (throttleTimeSensor == null) {
           // create the throttle time sensor also. Use default metric config
           throttleTimeSensor = metrics.sensor(throttleTimeSensorName,
                                               null,
@@ -214,7 +214,10 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                                                 "Tracking average throttle-time per client",
                                                 "client-id",
                                                 clientId), new Avg())
+        }
+
 
+        if (quotaSensor == null) {
           quotaSensor = metrics.sensor(quotaSensorName,
                                        getQuotaMetricConfig(quota(clientId)),
                                        ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d893654/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 68d6932..193acfd 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -18,7 +18,6 @@ package kafka.server
 
 import java.util.Collections
 
-import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
 import org.apache.kafka.common.utils.MockTime
 import org.junit.Assert.{assertEquals, assertTrue}
@@ -44,8 +43,8 @@ class ClientQuotaManagerTest {
     val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time)
 
     // Case 1: Update the quota. Assert that the new quota value is returned
-    clientMetrics.updateQuota("p1", new Quota(2000, true));
-    clientMetrics.updateQuota("p2", new Quota(4000, true));
+    clientMetrics.updateQuota("p1", new Quota(2000, true))
+    clientMetrics.updateQuota("p2", new Quota(4000, true))
 
     try {
       assertEquals("Default producer quota should be 500", new Quota(500, true), clientMetrics.quota("random-client-id"))
@@ -58,14 +57,14 @@ class ClientQuotaManagerTest {
 
       // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well
since the sensor was created.
       // p1 should not longer be throttled after the quota change
-      clientMetrics.updateQuota("p1", new Quota(3000, true));
+      clientMetrics.updateQuota("p1", new Quota(3000, true))
       assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true),
clientMetrics.quota("p1"))
 
       throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
 
       // Case 3: Change quota back to default. Should be throttled again
-      clientMetrics.updateQuota("p1", new Quota(500, true));
+      clientMetrics.updateQuota("p1", new Quota(500, true))
       assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota("p1"))
 
       throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback)
@@ -123,6 +122,49 @@ class ClientQuotaManagerTest {
     }
   }
 
+  @Test
+  def testExpireThrottleTimeSensor() {
+    val metrics = newMetrics
+    val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time)
+    try {
+      clientMetrics.recordAndMaybeThrottle("client1", 100, callback)
+      // remove the throttle time sensor
+      metrics.removeSensor("producerThrottleTime-client1")
+      // should not throw an exception even if the throttle time sensor does not exist.
+      val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 10000, callback)
+      assertTrue("Should be throttled", throttleTime > 0)
+      // the sensor should get recreated
+      val throttleTimeSensor = metrics.getSensor("producerThrottleTime-client1")
+      assertTrue("Throttle time sensor should exist", throttleTimeSensor != null)
+    } finally {
+      clientMetrics.shutdown()
+    }
+  }
+
+  @Test
+  def testExpireQuotaSensors() {
+    val metrics = newMetrics
+    val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time)
+    try {
+      clientMetrics.recordAndMaybeThrottle("client1", 100, callback)
+      // remove all the sensors
+      metrics.removeSensor("producerThrottleTime-client1")
+      metrics.removeSensor("producer-client1")
+      // should not throw an exception
+      val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 10000, callback)
+      assertTrue("Should be throttled", throttleTime > 0)
+
+      // all the sensors should get recreated
+      val throttleTimeSensor = metrics.getSensor("producerThrottleTime-client1")
+      assertTrue("Throttle time sensor should exist", throttleTimeSensor != null)
+
+      val byteRateSensor = metrics.getSensor("producer-client1")
+      assertTrue("Byte rate sensor should exist", byteRateSensor != null)
+    } finally {
+      clientMetrics.shutdown()
+    }
+  }
+
   def newMetrics: Metrics = {
     new Metrics(new MetricConfig(), Collections.emptyList(), time)
   }


Mime
View raw message