kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2489: add benchmark for new consumer
Date Wed, 09 Sep 2015 01:00:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f8902981c -> b8b1bca44


KAFKA-2489: add benchmark for new consumer

ewencp
The changes here are smaller than they look - mostly refactoring/cleanup.

- ConsumerPerformanceService: added new_consumer flag, and exposed more command-line settings
- benchmark.py: refactored to use `parametrize` and `matrix` - this reduced some amount of repeated code
- benchmark.py: added consumer performance tests with new consumer (using `parametrize`)
- benchmark.py: added more detailed test descriptions
- performance.py: broke into separate files

Author: Geoff Anderson <geoff@confluent.io>

Reviewers: Ewen Cheslack-Postava, Jason Gustafson, Gwen Shapira

Closes #179 from granders/KAFKA-2489-benchmark-new-consumer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8b1bca4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8b1bca4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8b1bca4

Branch: refs/heads/trunk
Commit: b8b1bca44095df0481fc6bb0d7ea5c06686b9337
Parents: f890298
Author: Geoff Anderson <geoff@confluent.io>
Authored: Tue Sep 8 17:59:49 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Sep 8 17:59:49 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |   2 +-
 .../scala/kafka/tools/ConsumerPerformance.scala |  44 +++-
 tests/kafkatest/services/performance.py         | 163 ------------
 .../kafkatest/services/performance/__init__.py  |  19 ++
 .../performance/consumer_performance.py         | 147 +++++++++++
 .../services/performance/end_to_end_latency.py  |  59 +++++
 .../services/performance/performance.py         |  29 +++
 .../performance/producer_performance.py         |  76 ++++++
 .../templates/tools_log4j.properties            |  26 ++
 .../services/templates/tools_log4j.properties   |   2 +-
 tests/kafkatest/tests/benchmark_test.py         | 259 +++++++------------
 .../clients/tools/ProducerPerformance.java      |   6 +-
 12 files changed, 494 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 382c388..19ef6eb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -155,7 +155,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * called <i>test</i> as described above.
  * <p>
  * The broker will automatically detect failed processes in the <i>test</i> group by using a heartbeat mechanism. The
- * consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as
+ * consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. As long as
  * the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
  * to it. If it stops heartbeating for a period of time longer than <code>session.timeout.ms</code> then it will be
  * considered dead and its partitions will be assigned to another process.

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 0078b00..1be4b23 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -17,11 +17,15 @@
 
 package kafka.tools
 
+import java.util
+
+import org.apache.kafka.common.TopicPartition
+
 import scala.collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicLong
 import java.nio.channels.ClosedByInterruptException
 import org.apache.log4j.Logger
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import kafka.utils.CommandLineUtils
 import java.util.{ Random, Properties }
@@ -58,7 +62,7 @@ object ConsumerPerformance {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
       consumer.subscribe(List(config.topic))
       startMs = System.currentTimeMillis
-      consume(consumer, config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
+      consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
       endMs = System.currentTimeMillis
       consumer.close()
     } else {
@@ -93,18 +97,40 @@ object ConsumerPerformance {
     }
   }
   
-  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
+  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
     var bytesRead = 0L
     var messagesRead = 0L
-    val startMs = System.currentTimeMillis
-    var lastReportTime: Long = startMs
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
-    var lastConsumed = System.currentTimeMillis
-    while(messagesRead < count && lastConsumed >= System.currentTimeMillis - timeout) {
+
+    // Wait for group join, metadata fetch, etc
+    val joinTimeout = 10000
+    val isAssigned = new AtomicBoolean(false)
+    consumer.subscribe(topics, new ConsumerRebalanceListener {
+      def onPartitionsAssigned(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
+        isAssigned.set(true)
+      }
+      def onPartitionsRevoked(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
+        isAssigned.set(false)
+      }})
+    val joinStart = System.currentTimeMillis()
+    while (!isAssigned.get()) {
+      if (System.currentTimeMillis() - joinStart >= joinTimeout) {
+        throw new Exception("Timed out waiting for initial group join.")
+      }
+      consumer.poll(100)
+    }
+    consumer.seekToBeginning()
+
+    // Now start the benchmark
+    val startMs = System.currentTimeMillis
+    var lastReportTime: Long = startMs
+    var lastConsumedTime = System.currentTimeMillis
+    
+    while(messagesRead < count && System.currentTimeMillis() - lastConsumedTime <= timeout) {
       val records = consumer.poll(100)
       if(records.count() > 0)
-        lastConsumed = System.currentTimeMillis
+        lastConsumedTime = System.currentTimeMillis
       for(record <- records) {
         messagesRead += 1
         if(record.key != null)
@@ -121,6 +147,7 @@ object ConsumerPerformance {
         }
       }
     }
+    
     totalMessagesRead.set(messagesRead)
     totalBytesRead.set(bytesRead)
   }
@@ -255,5 +282,4 @@ object ConsumerPerformance {
     }
 
   }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/tests/kafkatest/services/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance.py b/tests/kafkatest/services/performance.py
deleted file mode 100644
index 34892e0..0000000
--- a/tests/kafkatest/services/performance.py
+++ /dev/null
@@ -1,163 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.services.background_thread import BackgroundThreadService
-
-
-class PerformanceService(BackgroundThreadService):
-    def __init__(self, context, num_nodes):
-        super(PerformanceService, self).__init__(context, num_nodes)
-        self.results = [None] * self.num_nodes
-        self.stats = [[] for x in range(self.num_nodes)]
-
-
-class ProducerPerformanceService(PerformanceService):
-    def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
-        super(ProducerPerformanceService, self).__init__(context, num_nodes)
-        self.kafka = kafka
-        self.args = {
-            'topic': topic,
-            'num_records': num_records,
-            'record_size': record_size,
-            'throughput': throughput
-        }
-        self.settings = settings
-        self.intermediate_stats = intermediate_stats
-
-    def _worker(self, idx, node):
-        args = self.args.copy()
-        args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
-        cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
-              "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args
-
-        for key,value in self.settings.items():
-            cmd += " %s=%s" % (str(key), str(value))
-        self.logger.debug("Producer performance %d command: %s", idx, cmd)
-
-        def parse_stats(line):
-            parts = line.split(',')
-            return {
-                'records': int(parts[0].split()[0]),
-                'records_per_sec': float(parts[1].split()[0]),
-                'mbps': float(parts[1].split('(')[1].split()[0]),
-                'latency_avg_ms': float(parts[2].split()[0]),
-                'latency_max_ms': float(parts[3].split()[0]),
-                'latency_50th_ms': float(parts[4].split()[0]),
-                'latency_95th_ms': float(parts[5].split()[0]),
-                'latency_99th_ms': float(parts[6].split()[0]),
-                'latency_999th_ms': float(parts[7].split()[0]),
-            }
-        last = None
-        for line in node.account.ssh_capture(cmd):
-            self.logger.debug("Producer performance %d: %s", idx, line.strip())
-            if self.intermediate_stats:
-                try:
-                    self.stats[idx-1].append(parse_stats(line))
-                except:
-                    # Sometimes there are extraneous log messages
-                    pass
-            last = line
-        try:
-            self.results[idx-1] = parse_stats(last)
-        except:
-            self.logger.error("Bad last line: %s", last)
-
-
-class ConsumerPerformanceService(PerformanceService):
-    def __init__(self, context, num_nodes, kafka, topic, num_records, throughput, threads=1, settings={}):
-        super(ConsumerPerformanceService, self).__init__(context, num_nodes)
-        self.kafka = kafka
-        self.args = {
-            'topic': topic,
-            'num_records': num_records,
-            'throughput': throughput,
-            'threads': threads,
-        }
-        self.settings = settings
-
-    def _worker(self, idx, node):
-        args = self.args.copy()
-        args.update({'zk_connect': self.kafka.zk.connect_setting()})
-        cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\
-              "--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args
-        for key,value in self.settings.items():
-            cmd += " %s=%s" % (str(key), str(value))
-        self.logger.debug("Consumer performance %d command: %s", idx, cmd)
-        last = None
-        for line in node.account.ssh_capture(cmd):
-            self.logger.debug("Consumer performance %d: %s", idx, line.strip())
-            last = line
-        # Parse and save the last line's information
-        parts = last.split(',')
-
-        self.results[idx-1] = {
-            'total_mb': float(parts[2]),
-            'mbps': float(parts[3]),
-            'records_per_sec': float(parts[5]),
-        }
-
-
-class EndToEndLatencyService(PerformanceService):
-    def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
-        super(EndToEndLatencyService, self).__init__(context, num_nodes)
-        self.kafka = kafka
-        self.args = {
-            'topic': topic,
-            'num_records': num_records,
-            'consumer_fetch_max_wait': consumer_fetch_max_wait,
-            'acks': acks
-        }
-
-    def _worker(self, idx, node):
-        args = self.args.copy()
-        args.update({
-            'zk_connect': self.kafka.zk.connect_setting(),
-            'bootstrap_servers': self.kafka.bootstrap_servers(),
-        })
-        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
-              "%(bootstrap_servers)s %(topic)s %(num_records)d "\
-              "%(acks)d 20" % args
-        self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
-        results = {}
-        for line in node.account.ssh_capture(cmd):
-            self.logger.debug("End-to-end latency %d: %s", idx, line.strip())
-            if line.startswith("Avg latency:"):
-                results['latency_avg_ms'] = float(line.split()[2])
-            if line.startswith("Percentiles"):
-                results['latency_50th_ms'] = float(line.split()[3][:-1])
-                results['latency_99th_ms'] = float(line.split()[6][:-1])
-                results['latency_999th_ms'] = float(line.split()[9])
-        self.results[idx-1] = results
-
-
-def parse_performance_output(summary):
-        parts = summary.split(',')
-        results = {
-            'records': int(parts[0].split()[0]),
-            'records_per_sec': float(parts[1].split()[0]),
-            'mbps': float(parts[1].split('(')[1].split()[0]),
-            'latency_avg_ms': float(parts[2].split()[0]),
-            'latency_max_ms': float(parts[3].split()[0]),
-            'latency_50th_ms': float(parts[4].split()[0]),
-            'latency_95th_ms': float(parts[5].split()[0]),
-            'latency_99th_ms': float(parts[6].split()[0]),
-            'latency_999th_ms': float(parts[7].split()[0]),
-        }
-        # To provide compatibility with ConsumerPerformanceService
-        results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec'])
-        results['rate_mbps'] = results['mbps']
-        results['rate_mps'] = results['records_per_sec']
-
-        return results

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/tests/kafkatest/services/performance/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/__init__.py b/tests/kafkatest/services/performance/__init__.py
new file mode 100644
index 0000000..a72e3b7
--- /dev/null
+++ b/tests/kafkatest/services/performance/__init__.py
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from performance import PerformanceService
+from end_to_end_latency import EndToEndLatencyService
+from producer_performance import ProducerPerformanceService
+from consumer_performance import ConsumerPerformanceService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
new file mode 100644
index 0000000..ecaef43
--- /dev/null
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.performance import PerformanceService
+
+import os
+
+
+class ConsumerPerformanceService(PerformanceService):
+    """
+        See ConsumerPerformance.scala as the source of truth on these settings, but for reference:
+
+        "zookeeper" "The connection string for the zookeeper connection in the form host:port. Multiple URLS can
+                     be given to allow fail-over. This option is only used with the old consumer."
+
+        "broker-list", "A broker list to use for connecting if using the new consumer."
+
+        "topic", "REQUIRED: The topic to consume from."
+
+        "group", "The group id to consume on."
+
+        "fetch-size", "The amount of data to fetch in a single request."
+
+        "from-latest", "If the consumer does not already have an establishedoffset to consume from,
+                        start with the latest message present in the log rather than the earliest message."
+
+        "socket-buffer-size", "The size of the tcp RECV size."
+
+        "threads", "Number of processing threads."
+
+        "num-fetch-threads", "Number of fetcher threads. Defaults to 1"
+
+        "new-consumer", "Use the new consumer implementation."
+    """
+
+    # Root directory for persistent output
+    PERSISTENT_ROOT = "/mnt/consumer_performance"
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "consumer_performance.stdout")
+    LOG_FILE = os.path.join(LOG_DIR, "consumer_performance.log")
+    LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+
+    logs = {
+        "consumer_performance_output": {
+            "path": STDOUT_CAPTURE,
+            "collect_default": True},
+
+        "consumer_performance_log": {
+            "path": LOG_FILE,
+            "collect_default": True}
+    }
+
+    def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}):
+        super(ConsumerPerformanceService, self).__init__(context, num_nodes)
+        self.kafka = kafka
+        self.topic = topic
+        self.messages = messages
+        self.new_consumer = new_consumer
+        self.settings = settings
+
+        # These less-frequently used settings can be updated manually after instantiation
+        self.fetch_size = None
+        self.socket_buffer_size = None
+        self.threads = None
+        self.num_fetch_threads = None
+        self.group = None
+        self.from_latest = None
+
+    @property
+    def args(self):
+        """Dictionary of arguments used to start the Consumer Performance script."""
+        args = {
+            'topic': self.topic,
+            'messages': self.messages,
+        }
+
+        if self.new_consumer:
+            args['new-consumer'] = ""
+            args['broker-list'] = self.kafka.bootstrap_servers()
+        else:
+            args['zookeeper'] = self.kafka.zk.connect_setting()
+
+        if self.fetch_size is not None:
+            args['fetch-size'] = self.fetch_size
+
+        if self.socket_buffer_size is not None:
+            args['socket-buffer-size'] = self.socket_buffer_size
+
+        if self.threads is not None:
+            args['threads'] = self.threads
+
+        if self.num_fetch_threads is not None:
+            args['num-fetch-threads'] = self.num_fetch_threads
+
+        if self.group is not None:
+            args['group'] = self.group
+
+        if self.from_latest:
+            args['from-latest'] = ""
+
+        return args
+
+    @property
+    def start_cmd(self):
+        cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR
+        cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG
+        cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh"
+        for key, value in self.args.items():
+            cmd += " --%s %s" % (key, value)
+
+        for key, value in self.settings.items():
+            cmd += " %s=%s" % (str(key), str(value))
+
+        cmd += " | tee %s" % ConsumerPerformanceService.STDOUT_CAPTURE
+        return cmd
+
+    def _worker(self, idx, node):
+        node.account.ssh("mkdir -p %s" % ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
+
+        log_config = self.render('tools_log4j.properties', log_file=ConsumerPerformanceService.LOG_FILE)
+        node.account.create_file(ConsumerPerformanceService.LOG4J_CONFIG, log_config)
+
+        cmd = self.start_cmd
+        self.logger.debug("Consumer performance %d command: %s", idx, cmd)
+        last = None
+        for line in node.account.ssh_capture(cmd):
+            last = line
+        # Parse and save the last line's information
+        parts = last.split(',')
+
+        self.results[idx-1] = {
+            'total_mb': float(parts[2]),
+            'mbps': float(parts[3]),
+            'records_per_sec': float(parts[5]),
+        }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
new file mode 100644
index 0000000..4c61a93
--- /dev/null
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.performance import PerformanceService
+
+
+class EndToEndLatencyService(PerformanceService):
+
+    logs = {
+        "end_to_end_latency_log": {
+            "path": "/mnt/end-to-end-latency.log",
+            "collect_default": True},
+    }
+
+    def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
+        super(EndToEndLatencyService, self).__init__(context, num_nodes)
+        self.kafka = kafka
+        self.args = {
+            'topic': topic,
+            'num_records': num_records,
+            'consumer_fetch_max_wait': consumer_fetch_max_wait,
+            'acks': acks
+        }
+
+    def _worker(self, idx, node):
+        args = self.args.copy()
+        args.update({
+            'zk_connect': self.kafka.zk.connect_setting(),
+            'bootstrap_servers': self.kafka.bootstrap_servers(),
+        })
+
+        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
+              "%(bootstrap_servers)s %(topic)s %(num_records)d "\
+              "%(acks)d 20" % args
+
+        cmd += " | tee /mnt/end-to-end-latency.log"
+
+        self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
+        results = {}
+        for line in node.account.ssh_capture(cmd):
+            if line.startswith("Avg latency:"):
+                results['latency_avg_ms'] = float(line.split()[2])
+            if line.startswith("Percentiles"):
+                results['latency_50th_ms'] = float(line.split()[3][:-1])
+                results['latency_99th_ms'] = float(line.split()[6][:-1])
+                results['latency_999th_ms'] = float(line.split()[9])
+        self.results[idx-1] = results

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/tests/kafkatest/services/performance/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py
new file mode 100644
index 0000000..6d286f6
--- /dev/null
+++ b/tests/kafkatest/services/performance/performance.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+
+class PerformanceService(BackgroundThreadService):
+
+    def __init__(self, context, num_nodes):
+        super(PerformanceService, self).__init__(context, num_nodes)
+        self.results = [None] * self.num_nodes
+        self.stats = [[] for x in range(self.num_nodes)]
+
+    def clean_node(self, node):
+        node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf /mnt/*", allow_fail=False)
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
new file mode 100644
index 0000000..c46a910
--- /dev/null
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.performance import PerformanceService
+
+
+class ProducerPerformanceService(PerformanceService):
+
+    logs = {
+        "producer_performance_log": {
+            "path": "/mnt/producer-performance.log",
+            "collect_default": True},
+    }
+
+    def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
+        super(ProducerPerformanceService, self).__init__(context, num_nodes)
+        self.kafka = kafka
+        self.args = {
+            'topic': topic,
+            'num_records': num_records,
+            'record_size': record_size,
+            'throughput': throughput
+        }
+        self.settings = settings
+        self.intermediate_stats = intermediate_stats
+
+    def _worker(self, idx, node):
+        args = self.args.copy()
+        args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
+        cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
+              "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s"\
+              " | tee /mnt/producer-performance.log" % args
+
+        for key, value in self.settings.items():
+            cmd += " %s=%s" % (str(key), str(value))
+        self.logger.debug("Producer performance %d command: %s", idx, cmd)
+
+        def parse_stats(line):
+            parts = line.split(',')
+            return {
+                'records': int(parts[0].split()[0]),
+                'records_per_sec': float(parts[1].split()[0]),
+                'mbps': float(parts[1].split('(')[1].split()[0]),
+                'latency_avg_ms': float(parts[2].split()[0]),
+                'latency_max_ms': float(parts[3].split()[0]),
+                'latency_50th_ms': float(parts[4].split()[0]),
+                'latency_95th_ms': float(parts[5].split()[0]),
+                'latency_99th_ms': float(parts[6].split()[0]),
+                'latency_999th_ms': float(parts[7].split()[0]),
+            }
+        last = None
+        for line in node.account.ssh_capture(cmd):
+            if self.intermediate_stats:
+                try:
+                    self.stats[idx-1].append(parse_stats(line))
+                except:
+                    # Sometimes there are extraneous log messages
+                    pass
+
+            last = line
+        try:
+            self.results[idx-1] = parse_stats(last)
+        except:
+            raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/tests/kafkatest/services/performance/templates/tools_log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/templates/tools_log4j.properties b/tests/kafkatest/services/performance/templates/tools_log4j.properties
new file mode 100644
index 0000000..ce30d52
--- /dev/null
+++ b/tests/kafkatest/services/performance/templates/tools_log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define the root logger with appender file
+log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
+
+log4j.appender.FILE=org.apache.log4j.FileAppender
+log4j.appender.FILE.File={{ log_file }}
+log4j.appender.FILE.ImmediateFlush=true
+log4j.appender.FILE.Threshold=debug
+# Set the append to false, overwrite
+log4j.appender.FILE.Append=false
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/tests/kafkatest/services/templates/tools_log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties
index e63e6d6..ce30d52 100644
--- a/tests/kafkatest/services/templates/tools_log4j.properties
+++ b/tests/kafkatest/services/templates/tools_log4j.properties
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 # Define the root logger with appender file
-log4j.rootLogger = INFO, FILE
+log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
 
 log4j.appender.FILE=org.apache.log4j.FileAppender
 log4j.appender.FILE.File={{ log_file }}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/tests/kafkatest/tests/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py
index b01f27b..02503ec 100644
--- a/tests/kafkatest/tests/benchmark_test.py
+++ b/tests/kafkatest/tests/benchmark_test.py
@@ -14,136 +14,90 @@
 # limitations under the License.
 
 from ducktape.services.service import Service
+from ducktape.mark import parametrize
+from ducktape.mark import matrix
 
 from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
+from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
+
+
+TOPIC_REP_ONE = "topic-replication-factor-one"
+TOPIC_REP_THREE = "topic-replication-factor-three"
+DEFAULT_RECORD_SIZE = 100  # bytes
 
 
 class Benchmark(KafkaTest):
-    '''A benchmark of Kafka producer/consumer performance. This replicates the test
+    """A benchmark of Kafka producer/consumer performance. This replicates the test
     run here:
     https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
-    '''
+    """
     def __init__(self, test_context):
         super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
-            'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
-            'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
+            TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
+            TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
         })
 
-        if True:
-            # Works on both aws and local
-            self.msgs = 1000000
-            self.msgs_default = 1000000
-        else:
-            # Can use locally on Vagrant VMs, but may use too much memory for aws
-            self.msgs = 50000000
-            self.msgs_default = 50000000
-
         self.msgs_large = 10000000
-        self.msg_size_default = 100
         self.batch_size = 8*1024
         self.buffer_memory = 64*1024*1024
         self.msg_sizes = [10, 100, 1000, 10000, 100000]
         self.target_data_size = 128*1024*1024
         self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
 
-    def test_single_producer_no_replication(self):
-        self.logger.info("BENCHMARK: Single producer, no replication")
-        self.perf = ProducerPerformanceService(
-            self.test_context, 1, self.kafka,
-            topic="test-rep-one", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
-            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
-        )
-        self.perf.run()
-        data = compute_throughput(self.perf)
-        self.logger.info("Single producer, no replication: %s", str(data))
-        return data
+    @parametrize(acks=1, topic=TOPIC_REP_ONE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
+    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
+    @parametrize(acks=-1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
+    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3, message_size=DEFAULT_RECORD_SIZE)
+    @matrix(acks=[1], topic=[TOPIC_REP_THREE], num_producers=[1], message_size=[10, 100, 1000, 10000, 100000])
+    def test_producer_throughput(self, acks, topic, num_producers, message_size):
+        """
+        Setup: 1 node zk + 3 node kafka cluster
+        Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
+        and message size are varied depending on arguments injected into this test.
+
+        Collect and return aggregate throughput statistics after all messages have been acknowledged.
+        (This runs ProducerPerformance.java under the hood)
+        """
+        # Always generate the same total amount of data
+        nrecords = int(self.target_data_size / message_size)
 
-    def test_single_producer_replication(self):
-        self.logger.info("BENCHMARK: Single producer, async 3x replication")
-        self.perf = ProducerPerformanceService(
-            self.test_context, 1, self.kafka,
-            topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
-            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
-        )
-        self.perf.run()
-        data = compute_throughput(self.perf)
-        self.logger.info("Single producer, async 3x replication: %s" % str(data))
-        return data
-
-    def test_single_producer_sync(self):
-        self.logger.info("BENCHMARK: Single producer, sync 3x replication")
-        self.perf = ProducerPerformanceService(
-            self.test_context, 1, self.kafka,
-            topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
-            settings={'acks':-1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
-        )
-        self.perf.run()
-
-        data = compute_throughput(self.perf)
-        self.logger.info("Single producer, sync 3x replication: %s" % data)
-        return data
-
-    def test_three_producers_async(self):
-        self.logger.info("BENCHMARK: Three producers, async 3x replication")
-        self.perf = ProducerPerformanceService(
-            self.test_context, 3, self.kafka,
-            topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
-            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
-        )
-        self.perf.run()
+        self.producer = ProducerPerformanceService(
+            self.test_context, num_producers, self.kafka, topic=topic,
+            num_records=nrecords, record_size=message_size,  throughput=-1,
+            settings={
+                'acks': acks,
+                'batch.size': self.batch_size,
+                'buffer.memory': self.buffer_memory})
+        self.producer.run()
+        return compute_aggregate_throughput(self.producer)
 
-        data = compute_throughput(self.perf)
-        self.logger.info("Three producers, async 3x replication: %s" % data)
-        return data
+    def test_long_term_producer_throughput(self):
+        """
+        Setup: 1 node zk + 3 node kafka cluster
+        Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
 
-    def test_multiple_message_size(self):
-        # TODO this would be a great place to use parametrization
-        self.perfs = {}
-        for msg_size in self.msg_sizes:
-            self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, self.target_data_size_gb)
-            # Always generate the same total amount of data
-            nrecords = int(self.target_data_size / msg_size)
-            self.perfs["perf-" + str(msg_size)] = ProducerPerformanceService(
-                self.test_context, 1, self.kafka,
-                topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
-                settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
-            )
-
-        self.msg_size_perf = {}
-        for msg_size in self.msg_sizes:
-            perf = self.perfs["perf-" + str(msg_size)]
-            perf.run()
-            self.msg_size_perf[msg_size] = perf
-
-        summary = ["Message size:"]
-        data = {}
-        for msg_size in self.msg_sizes:
-            datum = compute_throughput(self.msg_size_perf[msg_size])
-            summary.append(" %d: %s" % (msg_size, datum))
-            data[msg_size] = datum
-        self.logger.info("\n".join(summary))
-        return data
+        Collect and return aggregate throughput statistics after all messages have been acknowledged.
 
-    def test_long_term_throughput(self):
-        self.logger.info("BENCHMARK: Long production")
-        self.perf = ProducerPerformanceService(
+        (This runs ProducerPerformance.java under the hood)
+        """
+        self.producer = ProducerPerformanceService(
             self.test_context, 1, self.kafka,
-            topic="test-rep-three", num_records=self.msgs_large, record_size=self.msg_size_default, throughput=-1,
-            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory},
+            topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
+            throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
             intermediate_stats=True
         )
-        self.perf.run()
+        self.producer.run()
 
         summary = ["Throughput over long run, data > memory:"]
         data = {}
         # FIXME we should be generating a graph too
         # Try to break it into 5 blocks, but fall back to a smaller number if
         # there aren't even 5 elements
-        block_size = max(len(self.perf.stats[0]) / 5, 1)
-        nblocks = len(self.perf.stats[0]) / block_size
+        block_size = max(len(self.producer.stats[0]) / 5, 1)
+        nblocks = len(self.producer.stats[0]) / block_size
+
         for i in range(nblocks):
-            subset = self.perf.stats[0][i*block_size:min((i+1)*block_size, len(self.perf.stats[0]))]
+            subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
             if len(subset) == 0:
                 summary.append(" Time block %d: (empty)" % i)
                 data[i] = None
@@ -158,35 +112,48 @@ class Benchmark(KafkaTest):
         return data
 
     def test_end_to_end_latency(self):
+        """
+        Setup: 1 node zk + 3 node kafka cluster
+        Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
+        measuring the latency between production and consumption of each message.
+
+        Return aggregate latency statistics.
+
+        (Under the hood, this simply runs EndToEndLatency.scala)
+        """
         self.logger.info("BENCHMARK: End to end latency")
         self.perf = EndToEndLatencyService(
             self.test_context, 1, self.kafka,
-            topic="test-rep-three", num_records=10000
+            topic=TOPIC_REP_THREE, num_records=10000
         )
         self.perf.run()
+        return latency(self.perf.results[0]['latency_50th_ms'],  self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
 
-        data = latency(self.perf.results[0]['latency_50th_ms'],  self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
-        self.logger.info("End-to-end latency: %s" % str(data))
-        return data
+    @matrix(new_consumer=[True, False])
+    def test_producer_and_consumer(self, new_consumer=False):
+        """
+        Setup: 1 node zk + 3 node kafka cluster
+        Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
+        using new consumer if new_consumer == True
+
+        Return aggregate throughput statistics for both producer and consumer.
+
+        (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
+        """
+        num_records = 10 * 1000 * 1000  # 10e6
 
-    def test_producer_and_consumer(self):
-        self.logger.info("BENCHMARK: Producer + Consumer")
         self.producer = ProducerPerformanceService(
             self.test_context, 1, self.kafka,
-            topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
-            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+            topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+            settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
         )
-
         self.consumer = ConsumerPerformanceService(
-            self.test_context, 1, self.kafka,
-            topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
-        )
-
+            self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
         Service.run_parallel(self.producer, self.consumer)
 
         data = {
-            "producer": compute_throughput(self.producer),
-            "consumer": compute_throughput(self.consumer)
+            "producer": compute_aggregate_throughput(self.producer),
+            "consumer": compute_aggregate_throughput(self.consumer)
         }
         summary = [
             "Producer + consumer:",
@@ -194,49 +161,29 @@ class Benchmark(KafkaTest):
         self.logger.info("\n".join(summary))
         return data
 
-    def test_single_consumer(self):
-        topic = "test-rep-three"
+    @matrix(new_consumer=[True, False], num_consumers=[1])
+    def test_consumer_throughput(self, new_consumer, num_consumers):
+        """
+        Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
+        (using new consumer iff new_consumer == True), and report throughput.
+        """
+        num_records = 10 * 1000 * 1000  # 10e6
 
+        # seed kafka w/messages
         self.producer = ProducerPerformanceService(
             self.test_context, 1, self.kafka,
-            topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
-            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+            topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+            settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
         )
         self.producer.run()
 
-        # All consumer tests use the messages from the first benchmark, so
-        # they'll get messages of the default message size
-        self.logger.info("BENCHMARK: Single consumer")
-        self.perf = ConsumerPerformanceService(
-            self.test_context, 1, self.kafka,
-            topic=topic, num_records=self.msgs_default, throughput=-1, threads=1
-        )
-        self.perf.run()
-
-        data = compute_throughput(self.perf)
-        self.logger.info("Single consumer: %s" % data)
-        return data
-
-    def test_three_consumers(self):
-        topic = "test-rep-three"
-
-        self.producer = ProducerPerformanceService(
-            self.test_context, 1, self.kafka,
-            topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
-            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
-        )
-        self.producer.run()
-
-        self.logger.info("BENCHMARK: Three consumers")
-        self.perf = ConsumerPerformanceService(
-            self.test_context, 3, self.kafka,
-            topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
-        )
-        self.perf.run()
-
-        data = compute_throughput(self.perf)
-        self.logger.info("Three consumers: %s", data)
-        return data
+        # consume
+        self.consumer = ConsumerPerformanceService(
+            self.test_context, num_consumers, self.kafka,
+            topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
+        self.consumer.group = "test-consumer-group"
+        self.consumer.run()
+        return compute_aggregate_throughput(self.consumer)
 
 
 def throughput(records_per_sec, mb_per_sec):
@@ -256,19 +203,9 @@ def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
     }
 
 
-def compute_throughput(perf):
+def compute_aggregate_throughput(perf):
     """Helper method for computing throughput after running a performance service."""
     aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
     aggregate_mbps = sum([r['mbps'] for r in perf.results])
 
     return throughput(aggregate_rate, aggregate_mbps)
-
-
-
-
-
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8b1bca4/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index fd31c1a..1a9cf04 100644
--- a/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -148,9 +148,9 @@ public class ProducerPerformance {
         }
 
         public void printTotal() {
-            long ellapsed = System.currentTimeMillis() - start;
-            double recsPerSec = 1000.0 * count / (double) ellapsed;
-            double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0);
+            long elapsed = System.currentTimeMillis() - start;
+            double recsPerSec = 1000.0 * count / (double) elapsed;
+            double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);
             int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
             System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",
                               count,


Mime
View raw message