kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: KAFKA-1472; Add the compression ratio metrics in the new producer; patched by Dong Lin; reviewed by Guozhang Wang and Jun Rao
Date Tue, 10 Jun 2014 23:51:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk dee16451e -> bb0184757


KAFKA-1472; Add the compression ratio metrics in the new producer; patched by Dong Lin; reviewed
by Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb018475
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb018475
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb018475

Branch: refs/heads/trunk
Commit: bb01847579dcf86841956c29278649bcf8609732
Parents: dee1645
Author: Dong Lin <lindong28@gmail.com>
Authored: Tue Jun 10 16:51:10 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Jun 10 16:51:10 2014 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      | 28 +++++++++++++++-----
 .../apache/kafka/common/record/Compressor.java  |  8 ++++++
 .../kafka/common/record/MemoryRecords.java      | 10 +++++++
 3 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bb018475/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 9b1f565..4352466 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
  * specific language governing permissions and limitations under the License.
@@ -179,7 +179,7 @@ public class Sender implements Runnable {
 
     /**
      * Run a single iteration of sending
-     * 
+     *
      * @param nowMs The current POSIX time in milliseconds
      */
     public void run(long nowMs) {
@@ -715,7 +715,7 @@ public class Sender implements Runnable {
 
         /**
          * Can we send more requests to this node?
-         * 
+         *
          * @param node Node in question
          * @return true iff we have no requests still being sent to the given node
          */
@@ -727,7 +727,7 @@ public class Sender implements Runnable {
 
         /**
          * Clear out all the in-flight requests for the given node and return them
-         * 
+         *
          * @param node The node
          * @return All the in-flight requests for that node that have been removed
          */
@@ -760,6 +760,7 @@ public class Sender implements Runnable {
         public final Sensor requestTimeSensor;
         public final Sensor recordsPerRequestSensor;
         public final Sensor batchSizeSensor;
+        public final Sensor compressionRateSensor;
         public final Sensor maxRecordSizeSensor;
 
         public SenderMetrics(Metrics metrics) {
@@ -768,6 +769,11 @@ public class Sender implements Runnable {
             this.batchSizeSensor = metrics.sensor("batch-size");
             this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent
per partition per-request.", new Avg());
 
+            this.compressionRateSensor = metrics.sensor("compression-rate");
+            this.compressionRateSensor.add("compression-rate-avg",
+                                     "The average compression rate of record batches.",
+                                     new Avg());
+
             this.queueTimeSensor = metrics.sensor("queue-time");
             this.queueTimeSensor.add("record-queue-time-avg",
                                      "The average time in ms record batches spent in the
record accumulator.",
@@ -818,6 +824,10 @@ public class Sender implements Runnable {
                 Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
                 topicByteRate.add("topic." + topic + ".byte-rate", new Rate());
 
+                String topicCompressionRateName = "topic." + topic + ".compression-rate";
+                Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
+                topicCompressionRate.add("topic." + topic + ".compression-rate", new Avg());
+
                 String topicRetryName = "topic." + topic + ".record-retries";
                 Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
                 topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
@@ -851,9 +861,15 @@ public class Sender implements Runnable {
                         Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName));
                         topicByteRate.record(batch.records.sizeInBytes());
 
+                        // per-topic compression rate
+                        String topicCompressionRateName = "topic." + topic + ".compression-rate";
+                        Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName));
+                        topicCompressionRate.record(batch.records.compressionRate());
+
                         // global metrics
                         this.batchSizeSensor.record(batch.records.sizeInBytes(), nowMs);
                         this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, nowMs);
+                        this.compressionRateSensor.record(batch.records.compressionRate());
                         this.maxRecordSizeSensor.record(batch.maxRecordSize, nowMs);
                         records += batch.recordCount;
                     }
@@ -876,7 +892,7 @@ public class Sender implements Runnable {
             this.errorSensor.record(count, nowMs);
             String topicErrorName = "topic." + topic + ".record-errors";
             Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
-            if (topicErrorSensor != null) 
+            if (topicErrorSensor != null)
               topicErrorSensor.record(count, nowMs);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb018475/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 0fa6dd2..0323f5f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -80,6 +80,14 @@ public class Compressor {
     public ByteBuffer buffer() {
         return bufferStream.buffer();
     }
+    
+    public double compressionRate() {
+        ByteBuffer buffer = bufferStream.buffer();
+        if (this.writtenUncompressed == 0)
+            return 1.0;
+        else
+            return (double) buffer.position() / this.writtenUncompressed;
+    }
 
     public void close() {
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb018475/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 428968c..15c9577 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -122,6 +122,16 @@ public class MemoryRecords implements Records {
     public int sizeInBytes() {
         return compressor.buffer().position();
     }
+    
+    /**
+     * The compression rate of this record set
+     */
+    public double compressionRate() {
+        if (compressor == null)
+            return 1.0;
+        else
+            return compressor.compressionRate();
+    }
 
     /**
      * Return the capacity of the buffer


Mime
View raw message