kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject kafka git commit: KAFKA-1807 Improve accuracy of ProducerPerformance target throughput; reviewed by Neha Narkhede
Date Fri, 05 Dec 2014 17:18:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 09e2fd651 -> 4fc74958e


KAFKA-1807 Improve accuracy of ProducerPerformance target throughput; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4fc74958
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4fc74958
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4fc74958

Branch: refs/heads/trunk
Commit: 4fc74958e3569f6179c11fddace99b674c81f370
Parents: 09e2fd6
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Fri Dec 5 09:18:26 2014 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Fri Dec 5 09:18:34 2014 -0800

----------------------------------------------------------------------
 .../kafka/clients/tools/ProducerPerformance.java    | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4fc74958/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index ac86150..28175fb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -55,6 +55,7 @@ public class ProducerPerformance {
         long sleepTime = NS_PER_SEC / throughput;
         long sleepDeficitNs = 0;
         Stats stats = new Stats(numRecords, 5000);
+        long start = System.currentTimeMillis();
         for (int i = 0; i < numRecords; i++) {
             long sendStart = System.currentTimeMillis();
             Callback cb = stats.nextCompletion(sendStart, payload.length, stats);
@@ -66,12 +67,15 @@ public class ProducerPerformance {
              * and then make up the whole deficit in one longer sleep.
              */
             if (throughput > 0) {
-                sleepDeficitNs += sleepTime;
-                if (sleepDeficitNs >= MIN_SLEEP_NS) {
-                    long sleepMs = sleepDeficitNs / 1000000;
-                    long sleepNs = sleepDeficitNs - sleepMs * 1000000;
-                    Thread.sleep(sleepMs, (int) sleepNs);
-                    sleepDeficitNs = 0;
+                float elapsed = (sendStart - start)/1000.f;
+                if (elapsed > 0 && i/elapsed > throughput) {
+                    sleepDeficitNs += sleepTime;
+                    if (sleepDeficitNs >= MIN_SLEEP_NS) {
+                        long sleepMs = sleepDeficitNs / 1000000;
+                        long sleepNs = sleepDeficitNs - sleepMs * 1000000;
+                        Thread.sleep(sleepMs, (int) sleepNs);
+                        sleepDeficitNs = 0;
+                    }
                 }
             }
         }


Mime
View raw message