kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4432; Added support to supply custom message payloads to perf-producer script.
Date Fri, 20 Jan 2017 01:47:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 813897a00 -> a37bf5fff


KAFKA-4432; Added support to supply custom message payloads to perf-producer script.

Current implementation of ProducerPerformance creates static payload. This is not very useful
in testing compression or when you want to test with production/custom payloads. So, we decided
to add support for providing payload file as an input to producer perf test script.

We made the following changes:
1. Added support to provide a payload file which can have the list of payloads that you actually
want to send.
2. Moved payload generation inside the send loop for cases when payload file is provided.

Following are the changes to how the producer-performance is evoked:
1. You must provide "--record-size" or "--payload-file" but not both. This is because, record
size cannot be guaranteed when you are using custom events.
  e.g. ./kafka-producer-perf-test.sh --topic test_topic --num-records 100000 --producer-props
bootstrap.servers=127.0.0.1:9092 acks=0 buffer.memory=33554432 compression.type=gzip batch.size=10240
linger.ms=10 --throughput -1 --payload-file ./test_payloads --payload-delimiter ,
2. Earlier "--record-size" was a required config, now you must provide exactly one of "--record-size"
or "--payload-file". Providing both will result in an error.
3. Support for an additional parameter "--payload-delimiter" has been added which defaults
to "\n"

Author: Sandesh K <sandesh.karkera@flipkart.com>

Reviewers: dan norwood <norwood@confluent.io>, Jun Rao <junrao@gmail.com>

Closes #2158 from SandeshKarkera/PerfProducerChanges


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a37bf5ff
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a37bf5ff
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a37bf5ff

Branch: refs/heads/trunk
Commit: a37bf5fffa4a607ded14ec055276d76f19df2d50
Parents: 813897a
Author: Sandesh K <sandesh.karkera@flipkart.com>
Authored: Thu Jan 19 17:46:59 2017 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Jan 19 17:46:59 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/tools/ProducerPerformance.java | 82 +++++++++++++++++---
 1 file changed, 72 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a37bf5ff/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index a13d3ec..c277b83 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
  * specific language governing permissions and limitations under the License.
@@ -14,11 +14,18 @@ package org.apache.kafka.tools;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
+import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -42,15 +49,36 @@ public class ProducerPerformance {
             /* parse args */
             String topicName = res.getString("topic");
             long numRecords = res.getLong("numRecords");
-            int recordSize = res.getInt("recordSize");
+            Integer recordSize = res.getInt("recordSize");
             int throughput = res.getInt("throughput");
             List<String> producerProps = res.getList("producerConfig");
             String producerConfig = res.getString("producerConfigFile");
+            String payloadFilePath = res.getString("payloadFile");
+
+            // since default value gets printed with the help text, we are escaping \n there
and replacing it with correct value here.
+            String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n"
: res.getString("payloadDelimiter");
 
             if (producerProps == null && producerConfig == null) {
                 throw new ArgumentParserException("Either --producer-props or --producer.config
must be specified.", parser);
             }
 
+            List<byte[]> payloadByteList = new ArrayList<>();
+            if (payloadFilePath != null) {
+                Path path = Paths.get(payloadFilePath);
+                System.out.println("Reading payloads from: " + path.toAbsolutePath());
+                if (Files.notExists(path) || Files.size(path) == 0)  {
+                    throw new  IllegalArgumentException("File does not exist or empty file
provided.");
+                }
+
+                String[] payloadList = new String(Files.readAllBytes(path), "UTF-8").split(payloadDelimiter);
+
+                System.out.println("Number of messages read: " + payloadList.length);
+
+                for (String payload : payloadList) {
+                    payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
+                }
+            }
+
             Properties props = new Properties();
             if (producerConfig != null) {
                 props.putAll(Utils.loadProps(producerConfig));
@@ -68,16 +96,24 @@ public class ProducerPerformance {
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
 
             /* setup perf test */
-            byte[] payload = new byte[recordSize];
+            byte[] payload = null;
             Random random = new Random(0);
-            for (int i = 0; i < payload.length; ++i)
-                payload[i] = (byte) (random.nextInt(26) + 65);
-            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicName,
payload);
+            if (recordSize != null) {
+                payload = new byte[recordSize];
+                for (int i = 0; i < payload.length; ++i)
+                    payload[i] = (byte) (random.nextInt(26) + 65);
+            }
+            ProducerRecord<byte[], byte[]> record;
             Stats stats = new Stats(numRecords, 5000);
             long startMs = System.currentTimeMillis();
 
             ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
             for (int i = 0; i < numRecords; i++) {
+                if (payloadFilePath != null) {
+                    payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
+                }
+                record = new ProducerRecord<>(topicName, payload);
+
                 long sendStartMs = System.currentTimeMillis();
                 Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
                 producer.send(record, cb);
@@ -109,6 +145,11 @@ public class ProducerPerformance {
                 .defaultHelp(true)
                 .description("This tool is used to verify the producer performance.");
 
+        MutuallyExclusiveGroup payloadOptions = parser
+                .addMutuallyExclusiveGroup()
+                .required(true)
+                .description("either --record-size or --payload-file must be specified but
not both.");
+
         parser.addArgument("--topic")
                 .action(store())
                 .required(true)
@@ -124,13 +165,34 @@ public class ProducerPerformance {
                 .dest("numRecords")
                 .help("number of messages to produce");
 
-        parser.addArgument("--record-size")
+        payloadOptions.addArgument("--record-size")
                 .action(store())
-                .required(true)
+                .required(false)
                 .type(Integer.class)
                 .metavar("RECORD-SIZE")
                 .dest("recordSize")
-                .help("message size in bytes");
+                .help("message size in bytes. Note that you must provide exactly one of --record-size
or --payload-file.");
+
+        payloadOptions.addArgument("--payload-file")
+                .action(store())
+                .required(false)
+                .type(String.class)
+                .metavar("PAYLOAD-FILE")
+                .dest("payloadFile")
+                .help("file to read the message payloads from. This works only for UTF-8
encoded text files. " +
+                        "Payloads will be read from this file and a payload will be randomly
selected when sending messages. " +
+                        "Note that you must provide exactly one of --record-size or --payload-file.");
+
+        parser.addArgument("--payload-delimiter")
+                .action(store())
+                .required(false)
+                .type(String.class)
+                .metavar("PAYLOAD-DELIMITER")
+                .dest("payloadDelimiter")
+                .setDefault("\\n")
+                .help("provides delimiter to be used when --payload-file is provided. " +
+                        "Defaults to new line. " +
+                        "Note that this parameter will be ignored if --payload-file is not
provided.");
 
         parser.addArgument("--throughput")
                 .action(store())


Mime
View raw message