kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7098; Improve accuracy of throttling by avoiding under-estimating actual rate in Throttler
Date Fri, 20 Jul 2018 06:49:15 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 80b5530  KAFKA-7098; Improve accuracy of throttling by avoiding under-estimating
actual rate in Throttler
80b5530 is described below

commit 80b55309d1090b0bb45d571c5f6114461f6cb041
Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>
AuthorDate: Thu Jul 19 23:48:26 2018 -0700

    KAFKA-7098; Improve accuracy of throttling by avoiding under-estimating actual rate in
Throttler
    
    Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>
    
    Reviewers: Dong Lin <lindong28@gmail.com>
    
    Closes #5350 from hzxa21/KAFKA-7098
---
 core/src/main/scala/kafka/utils/Throttler.scala    |  2 +-
 .../scala/unit/kafka/utils/ThrottlerTest.scala     | 63 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
index e781cd6..9fe3cdc 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -73,7 +73,7 @@ class Throttler(desiredRatePerSec: Double,
             time.sleep(sleepTime)
           }
         }
-        periodStartNs = now
+        periodStartNs = time.nanoseconds()
         observedSoFar = 0
       }
     }
diff --git a/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala b/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
new file mode 100755
index 0000000..d26e791
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.utils
+
+import kafka.utils.Throttler
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Test
+import org.junit.Assert.{assertTrue, assertEquals}
+
+
+class ThrottlerTest {
+  @Test
+  def testThrottleDesiredRate() {
+    val throttleCheckIntervalMs = 100
+    val desiredCountPerSec = 1000.0
+    val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0
+
+    val mockTime = new MockTime()
+    val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
+                                  checkIntervalMs = throttleCheckIntervalMs,
+                                  time = mockTime)
+
+    // Observe desiredCountPerInterval at t1
+    val t1 = mockTime.milliseconds()
+    throttler.maybeThrottle(desiredCountPerInterval)
+    assertEquals(t1, mockTime.milliseconds())
+
+    // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
+    mockTime.sleep(throttleCheckIntervalMs + 1)
+    throttler.maybeThrottle(desiredCountPerInterval)
+    val t2 = mockTime.milliseconds()
+    assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
+
+    // Observe desiredCountPerInterval at t2
+    throttler.maybeThrottle(desiredCountPerInterval)
+    assertEquals(t2, mockTime.milliseconds())
+
+    // Observe desiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
+    mockTime.sleep(throttleCheckIntervalMs + 1)
+    throttler.maybeThrottle(desiredCountPerInterval)
+    val t3 = mockTime.milliseconds()
+    assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs)
+
+    val elapsedTimeMs = t3 - t1
+    val actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs
+    assertTrue(actualCountPerSec <= desiredCountPerSec)
+  }
+}


Mime
View raw message