kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-5068; Optionally print out metrics after running the perf tests
Date Wed, 19 Apr 2017 16:53:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7c436d388 -> 609e9b0b2


KAFKA-5068; Optionally print out metrics after running the perf tests

junrao added a config `--print.metrics` to control whether ProducerPerformance prints out
metrics at the end of the test. If its okay, will add the code counterpart for consumer.

Author: huxi <huxi@zhenrongbao.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #2860 from amethystic/kafka-5068_print_metrics_in_perf_tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/609e9b0b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/609e9b0b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/609e9b0b

Branch: refs/heads/trunk
Commit: 609e9b0b2f46ce72ed91965f7e43c512b26a609b
Parents: 7c436d3
Author: huxi <huxi@zhenrongbao.com>
Authored: Wed Apr 19 09:53:13 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Apr 19 09:53:13 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsumerPerformance.scala | 18 ++++++-
 .../src/main/scala/kafka/utils/ToolsUtils.scala | 32 ++++++++++++
 .../apache/kafka/tools/ProducerPerformance.java | 20 +++++--
 .../java/org/apache/kafka/tools/ToolsUtils.java | 55 ++++++++++++++++++++
 4 files changed, 119 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/609e9b0b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index b7087f2..a5d4d5d 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -27,8 +27,8 @@ import org.apache.log4j.Logger
 import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.TopicPartition
-import kafka.utils.CommandLineUtils
+import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
+import kafka.utils.{CommandLineUtils, ToolsUtils}
 import java.util.{Collections, Properties, Random}
 
 import kafka.consumer.Consumer
@@ -38,6 +38,8 @@ import kafka.consumer.ConsumerTimeoutException
 import java.text.SimpleDateFormat
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.collection.mutable
+
 /**
  * Performance test for the full zookeeper consumer
  */
@@ -51,6 +53,7 @@ object ConsumerPerformance {
     val totalMessagesRead = new AtomicLong(0)
     val totalBytesRead = new AtomicLong(0)
     val consumerTimeout = new AtomicBoolean(false)
+    var metrics: mutable.Map[MetricName, _ <: Metric] = null
 
     if (!config.hideHeader) {
       if (!config.showDetailedStats)
@@ -66,6 +69,10 @@ object ConsumerPerformance {
       startMs = System.currentTimeMillis
       consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead,
totalBytesRead)
       endMs = System.currentTimeMillis
+
+      if (config.printMetrics) {
+        metrics = consumer.metrics().asScala
+      }
       consumer.close()
     } else {
       import kafka.consumer.ConsumerConfig
@@ -96,6 +103,11 @@ object ConsumerPerformance {
       println("%s, %s, %.4f, %.4f, %d, %.4f".format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
         totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get
/ elapsedSecs))
     }
+
+    if (metrics != null) {
+      ToolsUtils.printMetrics(metrics)
+    }
+
   }
 
   def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count:
Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead:
AtomicLong) {
@@ -210,12 +222,14 @@ object ConsumerPerformance {
       .withRequiredArg
       .describedAs("config file")
       .ofType(classOf[String])
+    val printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics. This only
applies to new consumer.")
 
     val options = parser.parse(args: _*)
 
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt)
 
     val useOldConsumer = options.has(zkConnectOpt)
+    val printMetrics = options.has(printMetricsOpt)
 
     val props = if (options.has(consumerConfigOpt))
       Utils.loadProps(options.valueOf(consumerConfigOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/609e9b0b/core/src/main/scala/kafka/utils/ToolsUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 65758d8..1be5a45 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -16,7 +16,14 @@
 */
 package kafka.utils
 
+import java.util
+import java.util.Comparator
+
 import joptsimple.OptionParser
+import org.apache.kafka.common.{Metric, MetricName}
+
+import scala.collection.immutable.ListMap
+import scala.collection.mutable
 
 object ToolsUtils {
 
@@ -33,4 +40,29 @@ object ToolsUtils {
     if(!isValid)
       CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092\n
")
   }
+
+  /**
+    * print out the metrics in alphabetical order
+    * @param metrics  the metrics to be printed out
+    */
+  def printMetrics(metrics: mutable.Map[MetricName, _ <: Metric]): Unit = {
+    var maxLengthOfDisplayName = 0
+
+    val sortedMap = metrics.toSeq.sortWith( (s,t) =>
+      Array(s._1.group(), s._1.name(), s._1.tags()).mkString(":")
+        .compareTo(Array(t._1.group(), t._1.name(), t._1.tags()).mkString(":")) < 0
+    ).map {
+      case (key, value) =>
+        val mergedKeyName = Array(key.group(), key.name(), key.tags()).mkString(":")
+        if (maxLengthOfDisplayName < mergedKeyName.length) {
+          maxLengthOfDisplayName = mergedKeyName.length
+        }
+        (mergedKeyName, value.value())
+    }
+    println(s"\n%-${maxLengthOfDisplayName}s   %s".format("Metric Name", "Value"))
+    sortedMap.foreach {
+      case (metricName, value) =>
+        println(s"%-${maxLengthOfDisplayName}s : %.3f".format(metricName, value))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/609e9b0b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index e96814d..b861b9d 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -17,17 +17,17 @@
 package org.apache.kafka.tools;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Arrays;
 
 import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
 import org.apache.kafka.clients.producer.Callback;
@@ -59,6 +59,7 @@ public class ProducerPerformance {
             List<String> producerProps = res.getList("producerConfig");
             String producerConfig = res.getString("producerConfigFile");
             String payloadFilePath = res.getString("payloadFile");
+            boolean shouldPrintMetrics = res.getBoolean("printMetrics");
 
             // since default value gets printed with the help text, we are escaping \n there
and replacing it with correct value here.
             String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n"
: res.getString("payloadDelimiter");
@@ -127,10 +128,14 @@ public class ProducerPerformance {
                     throttler.throttle();
                 }
             }
-
             /* print final results */
-            producer.close();
             stats.printTotal();
+
+            /* print out metrics */
+            if (shouldPrintMetrics) {
+                ToolsUtils.printMetrics(producer.metrics());
+            }
+            producer.close();
         } catch (ArgumentParserException e) {
             if (args.length == 0) {
                 parser.printHelp();
@@ -223,6 +228,13 @@ public class ProducerPerformance {
                 .dest("producerConfigFile")
                 .help("producer config properties file.");
 
+        parser.addArgument("--print-metrics")
+                .action(storeTrue())
+                .type(Boolean.class)
+                .metavar("PRINT-METRICS")
+                .dest("printMetrics")
+                .help("print out metrics at the end of the test.");
+
         return parser;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/609e9b0b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
new file mode 100644
index 0000000..e02bbb0
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class ToolsUtils {
+
+    /**
+     * print out the metrics in alphabetical order
+     * @param metrics   the metrics to be printed out
+     */
+    public static void printMetrics(Map<MetricName, ? extends Metric> metrics) {
+        if (metrics != null && !metrics.isEmpty()) {
+            int maxLengthOfDisplayName = 0;
+            TreeMap<String, Double> sortedMetrics = new TreeMap<>(new Comparator<String>()
{
+                @Override
+                public int compare(String o1, String o2) {
+                    return o1.compareTo(o2);
+                }
+            });
+            for (Metric metric : metrics.values()) {
+                MetricName mName = metric.metricName();
+                String mergedName = mName.group() + ":" + mName.name() + ":" + mName.tags();
+                maxLengthOfDisplayName = maxLengthOfDisplayName < mergedName.length()
? mergedName.length() : maxLengthOfDisplayName;
+                sortedMetrics.put(mergedName, metric.value());
+            }
+            String outputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f";
+            System.out.println(String.format("\n%-" + maxLengthOfDisplayName + "s   %s",
"Metric Name", "Value"));
+
+            for (Map.Entry<String, Double> entry : sortedMetrics.entrySet()) {
+                System.out.println(String.format(outputFormat, entry.getKey(), entry.getValue()));
+            }
+        }
+    }
+}


Mime
View raw message