kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2562: update kafka scripts to use new tools/code
Date Fri, 30 Oct 2015 22:30:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d50499a0e -> 1cc44830b


KAFKA-2562: update kafka scripts to use new tools/code

Updated  kafka-producer-perf-test.sh to use org.apache.kafka.clients.tools.ProducerPerformance.
Updated build.gradle to add kafka-tools-0.9.0.0-SNAPSHOT.jar to kafka/libs  folder.

Author: Manikumar reddy O <manikumar.reddy@gmail.com>

Reviewers: Gwen Shapira, Ismael Juma

Closes #242 from omkreddy/KAFKA-2562


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cc44830
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cc44830
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cc44830

Branch: refs/heads/trunk
Commit: 1cc44830b90688f1a2034243b8ee03b97101f75c
Parents: d50499a
Author: Manikumar reddy O <manikumar.reddy@gmail.com>
Authored: Fri Oct 30 15:30:34 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Oct 30 15:30:34 2015 -0700

----------------------------------------------------------------------
 bin/kafka-producer-perf-test.sh                 |   2 +-
 bin/windows/kafka-producer-perf-test.bat        |   2 +-
 build.gradle                                    |   5 +-
 .../performance/producer_performance.py         |   2 +-
 .../apache/kafka/tools/ProducerPerformance.java | 152 ++++++++++++++-----
 5 files changed, 117 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1cc44830/bin/kafka-producer-perf-test.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-producer-perf-test.sh b/bin/kafka-producer-perf-test.sh
index 84ac949..f583662 100755
--- a/bin/kafka-producer-perf-test.sh
+++ b/bin/kafka-producer-perf-test.sh
@@ -17,4 +17,4 @@
 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cc44830/bin/windows/kafka-producer-perf-test.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-producer-perf-test.bat b/bin/windows/kafka-producer-perf-test.bat
index a894752..55b024b 100644
--- a/bin/windows/kafka-producer-perf-test.bat
+++ b/bin/windows/kafka-producer-perf-test.bat
@@ -16,5 +16,5 @@ rem limitations under the License.
 
 SetLocal
 set KAFKA_HEAP_OPTS=-Xmx512M
-%~dp0kafka-run-class.bat kafka.tools.ProducerPerformance %*
+%~dp0kafka-run-class.bat org.apache.kafka.tools.ProducerPerformance %*
 EndLocal

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cc44830/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 279c51f..1987516 100644
--- a/build.gradle
+++ b/build.gradle
@@ -254,7 +254,7 @@ project(':core') {
 
   dependencies {
     compile project(':clients')
-    compile project(':log4j-appender')
+    compile "$slf4jlog4j"
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile 'org.apache.zookeeper:zookeeper:3.4.6'
     compile 'com.101tec:zkclient:0.6'
@@ -318,6 +318,9 @@ project(':core') {
     from(configurations.runtime) { into("libs/") }
     from(configurations.archives.artifacts.files) { into("libs/") }
     from(project.siteDocsTar) { into("site-docs/") }
+    from(project(':log4j-appender').jar) { into("libs/") }
+    from(project(':tools').jar) { into("libs/") }
+    from(project(':tools').configurations.runtime) { into("libs/") }
   }
 
   jar {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cc44830/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 401d6f7..a3b1d0e 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -53,7 +53,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             'kafka_directory': kafka_dir(node)
             })
         cmd = "JMX_PORT=%(jmx_port)d /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance
" \
-              "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s
client.id=%(client_id)s" % args
+              "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d
--throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s"
% args
 
         self.security_config.setup_node(node)
         if self.security_protocol == SecurityConfig.SSL:

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cc44830/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 3a90a10..3a06862 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -3,67 +3,135 @@
  * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
  * License. You may obtain a copy of the License at
- *
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.tools;
 
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
 import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
 
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+
 import org.apache.kafka.clients.producer.*;
 
 public class ProducerPerformance {
 
     public static void main(String[] args) throws Exception {
-        if (args.length < 4) {
-            System.err.println("USAGE: java " + ProducerPerformance.class.getName() +
-                               " topic_name num_records record_size target_records_sec [prop_name=prop_value]*");
-            System.exit(1);
-        }
+        ArgumentParser parser = argParser();
 
-        /* parse args */
-        String topicName = args[0];
-        long numRecords = Long.parseLong(args[1]);
-        int recordSize = Integer.parseInt(args[2]);
-        int throughput = Integer.parseInt(args[3]);
-
-        Properties props = new Properties();
-        for (int i = 4; i < args.length; i++) {
-            String[] pieces = args[i].split("=");
-            if (pieces.length != 2)
-                throw new IllegalArgumentException("Invalid property: " + args[i]);
-            props.put(pieces[0], pieces[1]);
-        }
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
-
-        /* setup perf test */
-        byte[] payload = new byte[recordSize];
-        Arrays.fill(payload, (byte) 1);
-        ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName,
payload);
-        Stats stats = new Stats(numRecords, 5000);
-        long startMs = System.currentTimeMillis();
-
-        ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
-        for (int i = 0; i < numRecords; i++) {
-            long sendStartMs = System.currentTimeMillis();
-            Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
-            producer.send(record, cb);
-
-            if (throttler.shouldThrottle(i, sendStartMs)) {
-                throttler.throttle();
+        try {
+            Namespace res = parser.parseArgs(args);
+
+            /* parse args */
+            String topicName = res.getString("topic");
+            long numRecords = res.getLong("numRecords");
+            int recordSize = res.getInt("recordSize");
+            int throughput = res.getInt("throughput");
+            List<String> producerProps = res.getList("producerConfig");
+
+            Properties props = new Properties();
+            if (producerProps != null)
+                for (String prop : producerProps) {
+                    String[] pieces = prop.split("=");
+                    if (pieces.length != 2)
+                        throw new IllegalArgumentException("Invalid property: " + prop);
+                    props.put(pieces[0], pieces[1]);
+                }
+
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
+
+            /* setup perf test */
+            byte[] payload = new byte[recordSize];
+            Arrays.fill(payload, (byte) 1);
+            ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName,
payload);
+            Stats stats = new Stats(numRecords, 5000);
+            long startMs = System.currentTimeMillis();
+
+            ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
+            for (int i = 0; i < numRecords; i++) {
+                long sendStartMs = System.currentTimeMillis();
+                Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
+                producer.send(record, cb);
+
+                if (throttler.shouldThrottle(i, sendStartMs)) {
+                    throttler.throttle();
+                }
+            }
+
+            /* print final results */
+            producer.close();
+            stats.printTotal();
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                System.exit(0);
+            } else {
+                parser.handleError(e);
+                System.exit(1);
             }
         }
 
-        /* print final results */
-        producer.close();
-        stats.printTotal();
+    }
+
+    /** Get the command-line argument parser. */
+    private static ArgumentParser argParser() {
+        ArgumentParser parser = ArgumentParsers
+                .newArgumentParser("producer-performance")
+                .defaultHelp(true)
+                .description("This tool is used to verify the producer performance.");
+
+        parser.addArgument("--topic")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("TOPIC")
+                .help("produce messages to this topic");
+
+        parser.addArgument("--num-records")
+                .action(store())
+                .required(true)
+                .type(Long.class)
+                .metavar("NUM-RECORDS")
+                .dest("numRecords")
+                .help("number of messages to produce");
+
+        parser.addArgument("--record-size")
+                .action(store())
+                .required(true)
+                .type(Integer.class)
+                .metavar("RECORD-SIZE")
+                .dest("recordSize")
+                .help("message size in bytes");
+
+        parser.addArgument("--throughput")
+                .action(store())
+                .required(true)
+                .type(Integer.class)
+                .metavar("THROUGHPUT")
+                .help("throttle maximum message throughput to *approximately* THROUGHPUT
messages/sec");
+
+        parser.addArgument("--producer-props")
+                 .nargs("+")
+                 .required(true)
+                 .metavar("PROP-NAME=PROP-VALUE")
+                 .type(String.class)
+                 .dest("producerConfig")
+                 .help("kafka producer related configuaration properties like bootstrap.servers,client.id
etc..");
+
+        return parser;
     }
 
     private static class Stats {


Mime
View raw message