kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-1468 Misc. improvements from benchmarking.
Date Sat, 31 May 2014 22:07:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 26eb1277e -> 02311c064


KAFKA-1468 Misc. improvements from benchmarking.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/02311c06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/02311c06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/02311c06

Branch: refs/heads/trunk
Commit: 02311c0642b3358e8180191ea0542e985ed0f6dc
Parents: 26eb127
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Tue Apr 22 13:51:49 2014 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Sat May 31 15:05:45 2014 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   1 +
 .../kafka/clients/producer/ProducerConfig.java  |  12 +-
 .../clients/producer/internals/Sender.java      |  31 ++-
 .../clients/tools/ProducerPerformance.java      | 220 +++++++++++++++----
 .../kafka/clients/producer/SenderTest.java      |   3 +
 .../main/scala/kafka/server/KafkaConfig.scala   |   4 +-
 .../scala/kafka/server/RequestPurgatory.scala   |   6 +-
 .../scala/kafka/tools/TestEndToEndLatency.scala |  17 +-
 8 files changed, 230 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/02311c06/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index f1def50..d15562a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -131,6 +131,7 @@ public class KafkaProducer implements Producer {
                                  config.getInt(ProducerConfig.TIMEOUT_CONFIG),
                                  config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                                  config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
+                                 config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                                  this.metrics,
                                  new SystemTime());
         this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/02311c06/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index bc4074e..f9de4af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -167,6 +167,10 @@ public class ProducerConfig extends AbstractConfig {
     public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
     private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as
metrics reporters. Implementing the <code>MetricReporter</code> interface allows
" + "plugging in classes that will be notified of new metric creation. The JmxReporter is
always included to register JMX statistics.";
 
+    /** <code>max.in.flight.requests.per.connection</code> */
+    public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
+    private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum
number of unacknowledged requests the client will send on a single connection before blocking.";
+
     static {
         config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH,
BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L,
atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -202,7 +206,13 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.LOW,
                                         METRICS_SAMPLE_WINDOW_MS_DOC)
-                                .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1),
Importance.LOW, METRICS_NUM_SAMPLES_DOC);
+                                .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1),
Importance.LOW, METRICS_NUM_SAMPLES_DOC)
+                                .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+                                        Type.INT,
+                                        5,
+                                        atLeast(1),
+                                        Importance.LOW,
+                                        MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC);
     }
 
     ProducerConfig(Map<? extends Object, ? extends Object> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/02311c06/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 3e83ae0..9b1f565 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -120,6 +120,7 @@ public class Sender implements Runnable {
                   int requestTimeout,
                   int socketSendBuffer,
                   int socketReceiveBuffer,
+                  int maxInFlightRequestsPerConnection,
                   Metrics metrics,
                   Time time) {
         this.nodeStates = new NodeStates(reconnectBackoffMs);
@@ -134,7 +135,7 @@ public class Sender implements Runnable {
         this.retries = retries;
         this.socketSendBuffer = socketSendBuffer;
         this.socketReceiveBuffer = socketReceiveBuffer;
-        this.inFlightRequests = new InFlightRequests();
+        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
         this.correlation = 0;
         this.metadataFetchInProgress = false;
         this.time = time;
@@ -678,7 +679,13 @@ public class Sender implements Runnable {
      * A set of outstanding request queues for each node that have not yet received responses
      */
     private static final class InFlightRequests {
-        private final Map<Integer, Deque<InFlightRequest>> requests = new HashMap<Integer,
Deque<InFlightRequest>>();
+        private final int maxInFlightRequestsPerConnection;
+        private final Map<Integer, Deque<InFlightRequest>> requests;
+
+        public InFlightRequests(int maxInFlightRequestsPerConnection) {
+            this.requests = new HashMap<Integer, Deque<InFlightRequest>>();
+            this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
+        }
 
         /**
          * Add the given request to the queue for the node it was directed to
@@ -714,7 +721,8 @@ public class Sender implements Runnable {
          */
         public boolean canSendMore(int node) {
             Deque<InFlightRequest> queue = requests.get(node);
-            return queue == null || queue.isEmpty() || queue.peekFirst().request.complete();
+            return queue == null || queue.isEmpty() ||
+                   (queue.peekFirst().request.complete() && queue.size() < this.maxInFlightRequestsPerConnection);
         }
 
         /**
@@ -762,11 +770,11 @@ public class Sender implements Runnable {
 
             this.queueTimeSensor = metrics.sensor("queue-time");
             this.queueTimeSensor.add("record-queue-time-avg",
-                "The average time in ms record batches spent in the record accumulator.",
-                new Avg());
+                                     "The average time in ms record batches spent in the
record accumulator.",
+                                     new Avg());
             this.queueTimeSensor.add("record-queue-time-max",
-                "The maximum time in ms record batches spent in the record accumulator.",
-                new Max());
+                                     "The maximum time in ms record batches spent in the
record accumulator.",
+                                     new Max());
 
             this.requestTimeSensor = metrics.sensor("request-time");
             this.requestTimeSensor.add("request-latency-avg", "The average request latency
in ms", new Avg());
@@ -859,7 +867,8 @@ public class Sender implements Runnable {
             this.retrySensor.record(count, nowMs);
             String topicRetryName = "topic." + topic + ".record-retries";
             Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName);
-            if (topicRetrySensor != null) topicRetrySensor.record(count, nowMs);
+            if (topicRetrySensor != null)
+              topicRetrySensor.record(count, nowMs);
         }
 
         public void recordErrors(String topic, int count) {
@@ -867,7 +876,8 @@ public class Sender implements Runnable {
             this.errorSensor.record(count, nowMs);
             String topicErrorName = "topic." + topic + ".record-errors";
             Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
-            if (topicErrorSensor != null) topicErrorSensor.record(count, nowMs);
+            if (topicErrorSensor != null) 
+              topicErrorSensor.record(count, nowMs);
         }
 
         public void recordLatency(int node, long latency) {
@@ -876,7 +886,8 @@ public class Sender implements Runnable {
             if (node >= 0) {
                 String nodeTimeName = "node-" + node + ".latency";
                 Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
-                if (nodeRequestTime != null) nodeRequestTime.record(latency, nowMs);
+                if (nodeRequestTime != null)
+                  nodeRequestTime.record(latency, nowMs);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/02311c06/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index eb18739..ac86150 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -17,68 +17,200 @@ import java.util.Properties;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.record.Records;
 
 public class ProducerPerformance {
 
+    private static final long NS_PER_MS = 1000000L;
+    private static final long NS_PER_SEC = 1000 * NS_PER_MS;
+    private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
+
     public static void main(String[] args) throws Exception {
-        if (args.length < 5) {
-            System.err.println("USAGE: java " + ProducerPerformance.class.getName()
-                               + " url topic_name num_records record_size acks [compression_type]");
+        if (args.length < 4) {
+            System.err.println("USAGE: java " + ProducerPerformance.class.getName() +
+                               " topic_name num_records record_size target_records_sec [prop_name=prop_value]*");
             System.exit(1);
         }
-        String url = args[0];
-        String topicName = args[1];
-        int numRecords = Integer.parseInt(args[2]);
-        int recordSize = Integer.parseInt(args[3]);
-        int acks = Integer.parseInt(args[4]);
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(acks));
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
-        props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5
* 1000));
-        props.setProperty(ProducerConfig.TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
-        props.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024
* 1024));
-        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(64 * 1024));
-        if (args.length == 6)
-            props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]);
 
+        /* parse args */
+        String topicName = args[0];
+        long numRecords = Long.parseLong(args[1]);
+        int recordSize = Integer.parseInt(args[2]);
+        int throughput = Integer.parseInt(args[3]);
+
+        Properties props = new Properties();
+        for (int i = 4; i < args.length; i++) {
+            String[] pieces = args[i].split("=");
+            if (pieces.length != 2)
+                throw new IllegalArgumentException("Invalid property: " + args[i]);
+            props.put(pieces[0], pieces[1]);
+        }
         KafkaProducer producer = new KafkaProducer(props);
-        Callback callback = new Callback() {
-            public void onCompletion(RecordMetadata metadata, Exception e) {
-                if (e != null)
-                    e.printStackTrace();
-            }
-        };
+
+        /* setup perf test */
         byte[] payload = new byte[recordSize];
         Arrays.fill(payload, (byte) 1);
         ProducerRecord record = new ProducerRecord(topicName, payload);
-        long start = System.currentTimeMillis();
-        long maxLatency = -1L;
-        long totalLatency = 0;
-        int reportingInterval = 1000000;
+        long sleepTime = NS_PER_SEC / throughput;
+        long sleepDeficitNs = 0;
+        Stats stats = new Stats(numRecords, 5000);
         for (int i = 0; i < numRecords; i++) {
             long sendStart = System.currentTimeMillis();
-            producer.send(record, callback);
-            long sendElapsed = System.currentTimeMillis() - sendStart;
-            maxLatency = Math.max(maxLatency, sendElapsed);
-            totalLatency += sendElapsed;
-            if (i % reportingInterval == 0) {
-                System.out.printf("%d  max latency = %d ms, avg latency = %.5f\n",
-                                  i,
-                                  maxLatency,
-                                  (totalLatency / (double) reportingInterval));
-                totalLatency = 0L;
-                maxLatency = -1L;
+            Callback cb = stats.nextCompletion(sendStart, payload.length, stats);
+            producer.send(record, cb);
+
+            /*
+             * Maybe sleep a little to control throughput. Sleep time can be a bit inaccurate
for times < 1 ms so
+             * instead of sleeping each time instead wait until a minimum sleep time accumulates
(the "sleep deficit")
+             * and then make up the whole deficit in one longer sleep.
+             */
+            if (throughput > 0) {
+                sleepDeficitNs += sleepTime;
+                if (sleepDeficitNs >= MIN_SLEEP_NS) {
+                    long sleepMs = sleepDeficitNs / 1000000;
+                    long sleepNs = sleepDeficitNs - sleepMs * 1000000;
+                    Thread.sleep(sleepMs, (int) sleepNs);
+                    sleepDeficitNs = 0;
+                }
             }
         }
-        long ellapsed = System.currentTimeMillis() - start;
-        double msgsSec = 1000.0 * numRecords / (double) ellapsed;
-        double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
+
+        /* print final results */
         producer.close();
-        System.out.printf("%d records sent in %d ms. %.2f records per second (%.2f mb/sec).\n",
numRecords, ellapsed, msgsSec, mbSec);
+        stats.printTotal();
+    }
+
+    private static class Stats {
+        private long start;
+        private long windowStart;
+        private int[] latencies;
+        private int sampling;
+        private int iteration;
+        private int index;
+        private long count;
+        private long bytes;
+        private int maxLatency;
+        private long totalLatency;
+        private long windowCount;
+        private int windowMaxLatency;
+        private long windowTotalLatency;
+        private long windowBytes;
+        private long reportingInterval;
+
+        public Stats(long numRecords, int reportingInterval) {
+            this.start = System.currentTimeMillis();
+            this.windowStart = System.currentTimeMillis();
+            this.index = 0;
+            this.iteration = 0;
+            this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
+            this.latencies = new int[(int) (numRecords / this.sampling) + 1];
+            this.index = 0;
+            this.maxLatency = 0;
+            this.totalLatency = 0;
+            this.windowCount = 0;
+            this.windowMaxLatency = 0;
+            this.windowTotalLatency = 0;
+            this.windowBytes = 0;
+            this.totalLatency = 0;
+            this.reportingInterval = reportingInterval;
+        }
+
+        public void record(int iter, int latency, int bytes, long time) {
+            this.count++;
+            this.bytes += bytes;
+            this.totalLatency += latency;
+            this.maxLatency = Math.max(this.maxLatency, latency);
+            this.windowCount++;
+            this.windowBytes += bytes;
+            this.windowTotalLatency += latency;
+            this.windowMaxLatency = Math.max(windowMaxLatency, latency);
+            if (iter % this.sampling == 0) {
+                this.latencies[index] = latency;
+                this.index++;
+            }
+            /* maybe report the recent perf */
+            if (time - windowStart >= reportingInterval) {
+                printWindow();
+                newWindow();
+            }
+        }
+
+        public Callback nextCompletion(long start, int bytes, Stats stats) {
+            Callback cb = new PerfCallback(this.iteration, start, bytes, stats);
+            this.iteration++;
+            return cb;
+        }
+
+        public void printWindow() {
+            long ellapsed = System.currentTimeMillis() - windowStart;
+            double recsPerSec = 1000.0 * windowCount / (double) ellapsed;
+            double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0);
+            System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg
latency, %.1f max latency.\n",
+                              windowCount,
+                              recsPerSec,
+                              mbPerSec,
+                              windowTotalLatency / (double) windowCount,
+                              (double) windowMaxLatency);
+        }
+
+        public void newWindow() {
+            this.windowStart = System.currentTimeMillis();
+            this.windowCount = 0;
+            this.windowMaxLatency = 0;
+            this.windowTotalLatency = 0;
+            this.windowBytes = 0;
+        }
+
+        public void printTotal() {
+            long ellapsed = System.currentTimeMillis() - start;
+            double recsPerSec = 1000.0 * count / (double) ellapsed;
+            double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0);
+            int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
+            System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg
latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",
+                              count,
+                              recsPerSec,
+                              mbPerSec,
+                              totalLatency / (double) count,
+                              (double) maxLatency,
+                              percs[0],
+                              percs[1],
+                              percs[2],
+                              percs[3]);
+        }
+
+        private static int[] percentiles(int[] latencies, int count, double... percentiles)
{
+            int size = Math.min(count, latencies.length);
+            Arrays.sort(latencies, 0, size);
+            int[] values = new int[percentiles.length];
+            for (int i = 0; i < percentiles.length; i++) {
+                int index = (int) (percentiles[i] * size);
+                values[i] = latencies[index];
+            }
+            return values;
+        }
+    }
+
+    private static final class PerfCallback implements Callback {
+        private final long start;
+        private final int iteration;
+        private final int bytes;
+        private final Stats stats;
+
+        public PerfCallback(int iter, long start, int bytes, Stats stats) {
+            this.start = start;
+            this.stats = stats;
+            this.iteration = iter;
+            this.bytes = bytes;
+        }
+
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            long now = System.currentTimeMillis();
+            int latency = (int) (now - start);
+            this.stats.record(iteration, latency, bytes, now);
+            if (exception != null)
+                exception.printStackTrace();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/02311c06/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index a2b7722..3ef692c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -50,6 +50,7 @@ public class SenderTest {
     private static final int REQUEST_TIMEOUT_MS = 10000;
     private static final int SEND_BUFFER_SIZE = 64 * 1024;
     private static final int RECEIVE_BUFFER_SIZE = 64 * 1024;
+    private static final int MAX_IN_FLIGHT_REQS = Integer.MAX_VALUE;
 
     private TopicPartition tp = new TopicPartition("test", 0);
     private MockTime time = new MockTime();
@@ -70,6 +71,7 @@ public class SenderTest {
                                        REQUEST_TIMEOUT_MS,
                                        SEND_BUFFER_SIZE,
                                        RECEIVE_BUFFER_SIZE,
+                                       MAX_IN_FLIGHT_REQS,
                                        metrics,
                                        time);
 
@@ -115,6 +117,7 @@ public class SenderTest {
                                    REQUEST_TIMEOUT_MS,
                                    SEND_BUFFER_SIZE,
                                    RECEIVE_BUFFER_SIZE,
+                                   MAX_IN_FLIGHT_REQS,
                                    new Metrics(),
                                    time);
         Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
CompressionType.NONE, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/02311c06/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c7508d5..ef75b67 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -215,10 +215,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends
ZKConfig(pro
   val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms",
5000L)
 
   /* the purge interval (in number of requests) of the fetch request purgatory */
-  val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests",
10000)
+  val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests",
1000)
 
   /* the purge interval (in number of requests) of the producer request purgatory */
-  val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests",
10000)
+  val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests",
1000)
 
   /* Enables auto leader balancing. A background thread checks and triggers leader
    * balance if required at regular intervals */

http://git-wip-us.apache.org/repos/asf/kafka/blob/02311c06/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index c064c5c..3d0ff1e 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -61,7 +61,7 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request,
de
  * this function handles delayed requests that have hit their time limit without being satisfied.
  *
  */
-abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval:
Int = 10000)
+abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval:
Int = 1000)
         extends Logging with KafkaMetricsGroup {
 
   /* a list of requests watching each key */
@@ -137,8 +137,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int
= 0, purge
    */
   private class Watchers {
 
-
-    private val requests = new util.ArrayList[T]
+    private val requests = new util.LinkedList[T]
 
     def numRequests = requests.size
 
@@ -217,6 +216,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int
= 0, purge
             }
           }
           if (requestCounter.get >= purgeInterval) { // see if we need to force a full
purge
+            debug("Beginning purgatory purge")
             requestCounter.set(0)
             val purged = purgeSatisfied()
             debug("Purged %d requests from delay queue.".format(purged))

http://git-wip-us.apache.org/repos/asf/kafka/blob/02311c06/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
index 37a9ec2..5f8f6bc 100644
--- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
@@ -18,6 +18,7 @@
 package kafka.tools
 
 import java.util.Properties
+import java.util.Arrays
 import kafka.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
 
@@ -35,9 +36,10 @@ object TestEndToEndLatency {
 
     val consumerProps = new Properties()
     consumerProps.put("group.id", topic)
-    consumerProps.put("auto.commit.enable", "true")
+    consumerProps.put("auto.commit.enable", "false")
     consumerProps.put("auto.offset.reset", "largest")
     consumerProps.put("zookeeper.connect", zkConnect)
+    consumerProps.put("fetch.wait.max.ms", "1")
     consumerProps.put("socket.timeout.ms", 1201000.toString)
 
     val config = new ConsumerConfig(consumerProps)
@@ -53,19 +55,26 @@ object TestEndToEndLatency {
 
     val message = "hello there beautiful".getBytes
     var totalTime = 0.0
+    val latencies = new Array[Long](numMessages)
     for (i <- 0 until numMessages) {
       var begin = System.nanoTime
-      val response = producer.send(new ProducerRecord(topic, message))
-      response.get()
+      producer.send(new ProducerRecord(topic, message))
       val received = iter.next
       val elapsed = System.nanoTime - begin
       // poor man's progress bar
       if (i % 1000 == 0)
         println(i + "\t" + elapsed / 1000.0 / 1000.0)
       totalTime += elapsed
+      latencies(i) = (elapsed / 1000 / 1000)
     }
-    println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms")
+    println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0))
+    Arrays.sort(latencies)
+    val p50 = latencies((latencies.length * 0.5).toInt)
+    val p99 = latencies((latencies.length * 0.99).toInt) 
+    val p999 = latencies((latencies.length * 0.999).toInt)
+    println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999))
     producer.close()
+    connector.commitOffsets(true)
     connector.shutdown()
     System.exit(0)
   }


Mime
View raw message