kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: TRIVIAL: Fix misc. numerical issues in histogram.
Date Thu, 06 Feb 2014 18:50:10 GMT
Updated Branches:
  refs/heads/trunk 253f86e31 -> 3220af1fe


TRIVIAL: Fix misc. numerical issues in histogram.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3220af1f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3220af1f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3220af1f

Branch: refs/heads/trunk
Commit: 3220af1fe2fad8da1f5bcc101ab9d3e8919b03bd
Parents: 253f86e
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Thu Feb 6 10:49:37 2014 -0800
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Thu Feb 6 10:49:37 2014 -0800

----------------------------------------------------------------------
 .../kafka/common/metrics/stats/Histogram.java   |  8 ++-
 .../kafka/common/metrics/stats/Percentiles.java | 74 ++++++++++++++------
 .../kafka/common/metrics/stats/SampledStat.java | 12 ++--
 .../java/kafka/common/metrics/MetricsTest.java  | 34 +++++++++
 .../common/metrics/stats/HistogramTest.java     | 44 ++++++++++--
 5 files changed, 136 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3220af1f/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
index c59b585..9922571 100644
--- a/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
+++ b/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
@@ -18,7 +18,7 @@ public class Histogram {
     }
 
     public double value(double quantile) {
-        if (count == 0L)
+        if (count == 0.0d)
             return Double.NaN;
         float sum = 0.0f;
         float quant = (float) quantile;
@@ -30,6 +30,10 @@ public class Histogram {
         return Float.POSITIVE_INFINITY;
     }
 
+    public float[] counts() {
+        return this.hist;
+    }
+
     public void clear() {
         for (int i = 0; i < this.hist.length; i++)
             this.hist[i] = 0.0f;
@@ -117,7 +121,7 @@ public class Histogram {
             if (b == this.bins - 1) {
                 return Float.POSITIVE_INFINITY;
             } else {
-                double unscaled = (b * (b - 1.0)) / 2.0;
+                double unscaled = (b * (b + 1.0)) / 2.0;
                 return unscaled * this.scale;
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3220af1f/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
index 686c726..c3f8942 100644
--- a/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
+++ b/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
@@ -13,35 +13,33 @@ import kafka.common.metrics.stats.Histogram.LinearBinScheme;
 /**
  * A compound stat that reports one or more percentiles
  */
-public class Percentiles implements CompoundStat {
+public class Percentiles extends SampledStat implements CompoundStat {
 
     public static enum BucketSizing {
         CONSTANT, LINEAR
     }
 
+    private final int buckets;
     private final Percentile[] percentiles;
-    private Histogram current;
-    private Histogram shadow;
-    private long lastWindow;
-    private long eventCount;
+    private final BinScheme binScheme;
 
     public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile...
percentiles) {
         this(sizeInBytes, 0.0, max, bucketing, percentiles);
     }
 
     public Percentiles(int sizeInBytes, double min, double max, BucketSizing bucketing, Percentile...
percentiles) {
+        super(0.0);
         this.percentiles = percentiles;
-        BinScheme scheme = null;
+        this.buckets = sizeInBytes / 4;
         if (bucketing == BucketSizing.CONSTANT) {
-            scheme = new ConstantBinScheme(sizeInBytes / 4, min, max);
+            this.binScheme = new ConstantBinScheme(buckets, min, max);
         } else if (bucketing == BucketSizing.LINEAR) {
             if (min != 0.0d)
                 throw new IllegalArgumentException("Linear bucket sizing requires min to
be 0.0.");
-            scheme = new LinearBinScheme(sizeInBytes / 4, max);
+            this.binScheme = new LinearBinScheme(buckets, max);
+        } else {
+            throw new IllegalArgumentException("Unknown bucket type: " + bucketing);
         }
-        this.current = new Histogram(scheme);
-        this.shadow = new Histogram(scheme);
-        this.eventCount = 0L;
     }
 
     @Override
@@ -51,26 +49,56 @@ public class Percentiles implements CompoundStat {
             final double pct = percentile.percentile();
             ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable()
{
                 public double measure(MetricConfig config, long now) {
-                    return current.value(pct / 100.0);
+                    return value(config, now, pct / 100.0);
                 }
             }));
         }
         return ms;
     }
 
+    public double value(MetricConfig config, long now, double quantile) {
+        timeoutObsoleteSamples(config, now);
+        float count = 0.0f;
+        for (Sample sample : this.samples)
+            count += sample.eventCount;
+        if (count == 0.0f)
+            return Double.NaN;
+        float sum = 0.0f;
+        float quant = (float) quantile;
+        for (int b = 0; b < buckets; b++) {
+            for (int s = 0; s < this.samples.size(); s++) {
+                HistogramSample sample = (HistogramSample) this.samples.get(s);
+                float[] hist = sample.histogram.counts();
+                sum += hist[b];
+                if (sum / count > quant)
+                    return binScheme.fromBin(b);
+            }
+        }
+        return Double.POSITIVE_INFINITY;
+    }
+
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
+        return value(config, now, 0.5);
+    }
+
     @Override
-    public void record(MetricConfig config, double value, long time) {
-        long ellapsed = time - this.lastWindow;
-        if (ellapsed > config.timeWindowNs() / 2 || this.eventCount > config.eventWindow()
/ 2)
-            this.shadow.clear();
-        if (ellapsed > config.timeWindowNs() || this.eventCount > config.eventWindow())
{
-            Histogram tmp = this.current;
-            this.current = this.shadow;
-            this.shadow = tmp;
-            this.shadow.clear();
+    protected HistogramSample newSample(long now) {
+        return new HistogramSample(this.binScheme, now);
+    }
+
+    @Override
+    protected void update(Sample sample, MetricConfig config, double value, long now) {
+        HistogramSample hist = (HistogramSample) sample;
+        hist.histogram.record(value);
+    }
+
+    private static class HistogramSample extends SampledStat.Sample {
+        private final Histogram histogram;
+
+        private HistogramSample(BinScheme scheme, long now) {
+            super(0.0, now);
+            this.histogram = new Histogram(scheme);
         }
-        this.current.record(value);
-        this.shadow.record(value);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3220af1f/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
index 6f820fa..e696af5 100644
--- a/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
+++ b/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
@@ -20,7 +20,7 @@ public abstract class SampledStat implements MeasurableStat {
 
     private double initialValue;
     private int current = 0;
-    private List<Sample> samples;
+    protected List<Sample> samples;
 
     public SampledStat(double initialValue) {
         this.initialValue = initialValue;
@@ -39,7 +39,7 @@ public abstract class SampledStat implements MeasurableStat {
     private Sample advance(MetricConfig config, long now) {
         this.current = (this.current + 1) % config.samples();
         if (this.current >= samples.size()) {
-            Sample sample = new Sample(this.initialValue, now);
+            Sample sample = newSample(now);
             this.samples.add(sample);
             return sample;
         } else {
@@ -49,6 +49,10 @@ public abstract class SampledStat implements MeasurableStat {
         }
     }
 
+    protected Sample newSample(long now) {
+        return new Sample(this.initialValue, now);
+    }
+
     @Override
     public double measure(MetricConfig config, long now) {
         timeoutObsoleteSamples(config, now);
@@ -57,7 +61,7 @@ public abstract class SampledStat implements MeasurableStat {
 
     public Sample current(long now) {
         if (samples.size() == 0)
-            this.samples.add(new Sample(initialValue, now));
+            this.samples.add(newSample(now));
         return this.samples.get(this.current);
     }
 
@@ -70,7 +74,7 @@ public abstract class SampledStat implements MeasurableStat {
     public abstract double combine(List<Sample> samples, MetricConfig config, long
now);
 
     /* Timeout any windows that have expired in the absense of any events */
-    private void timeoutObsoleteSamples(MetricConfig config, long now) {
+    protected void timeoutObsoleteSamples(MetricConfig config, long now) {
         for (int i = 0; i < samples.size(); i++) {
             int idx = (this.current + i) % samples.size();
             Sample sample = this.samples.get(idx);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3220af1f/clients/src/test/java/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/kafka/common/metrics/MetricsTest.java
index 7d06864..f66cc7f 100644
--- a/clients/src/test/java/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/kafka/common/metrics/MetricsTest.java
@@ -6,6 +6,7 @@ import static org.junit.Assert.fail;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
+import kafka.common.Metric;
 import kafka.common.metrics.stats.Avg;
 import kafka.common.metrics.stats.Count;
 import kafka.common.metrics.stats.Max;
@@ -163,6 +164,39 @@ public class MetricsTest {
         }
     }
 
+    @Test
+    public void testPercentiles() {
+        int buckets = 100;
+        Percentiles percs = new Percentiles(4 * buckets,
+                                            0.0,
+                                            100.0,
+                                            BucketSizing.CONSTANT,
+                                            new Percentile("test.p25", 25),
+                                            new Percentile("test.p50", 50),
+                                            new Percentile("test.p75", 75));
+        MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
+        Sensor sensor = metrics.sensor("test", config);
+        sensor.add(percs);
+        Metric p25 = this.metrics.metrics().get("test.p25");
+        Metric p50 = this.metrics.metrics().get("test.p50");
+        Metric p75 = this.metrics.metrics().get("test.p75");
+
+        // record two windows worth of sequential values
+        for (int i = 0; i < buckets; i++)
+            sensor.record(i);
+
+        assertEquals(25, p25.value(), 1.0);
+        assertEquals(50, p50.value(), 1.0);
+        assertEquals(75, p75.value(), 1.0);
+
+        for (int i = 0; i < buckets; i++)
+            sensor.record(0.0);
+
+        assertEquals(0.0, p25.value(), 1.0);
+        assertEquals(0.0, p50.value(), 1.0);
+        assertEquals(0.0, p75.value(), 1.0);
+    }
+
     public static class ConstantMeasurable implements Measurable {
         public double value = 0.0;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3220af1f/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java b/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
index 03bdd2b..9c6a4ab 100644
--- a/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
+++ b/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
@@ -1,6 +1,10 @@
 package kafka.common.metrics.stats;
 
 import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Random;
+
 import kafka.common.metrics.stats.Histogram.BinScheme;
 import kafka.common.metrics.stats.Histogram.ConstantBinScheme;
 import kafka.common.metrics.stats.Histogram.LinearBinScheme;
@@ -11,14 +15,14 @@ public class HistogramTest {
 
     private static final double EPS = 0.0000001d;
 
-    // @Test
+    @Test
     public void testHistogram() {
         BinScheme scheme = new ConstantBinScheme(12, -5, 5);
         Histogram hist = new Histogram(scheme);
         for (int i = -5; i < 5; i++)
             hist.record(i);
         for (int i = 0; i < 10; i++)
-            assertEquals(scheme.fromBin(i + 1), hist.value(i / 10.0), EPS);
+            assertEquals(scheme.fromBin(i + 1), hist.value(i / 10.0 + EPS), EPS);
     }
 
     @Test
@@ -33,18 +37,17 @@ public class HistogramTest {
         checkBinningConsistency(scheme);
     }
 
+    @Test
     public void testLinearBinScheme() {
-        LinearBinScheme scheme = new LinearBinScheme(5, 5);
-        for (int i = 0; i < scheme.bins(); i++)
-            System.out.println(i + " " + scheme.fromBin(i));
+        LinearBinScheme scheme = new LinearBinScheme(10, 10);
         checkBinningConsistency(scheme);
     }
 
     private void checkBinningConsistency(BinScheme scheme) {
         for (int bin = 0; bin < scheme.bins(); bin++) {
             double fromBin = scheme.fromBin(bin);
-            int binAgain = scheme.toBin(fromBin);
-            assertEquals("unbinning and rebinning " + bin
+            int binAgain = scheme.toBin(fromBin + EPS);
+            assertEquals("unbinning and rebinning the bin " + bin
                          + " gave a different result ("
                          + fromBin
                          + " was placed in bin "
@@ -53,4 +56,31 @@ public class HistogramTest {
         }
     }
 
+    public static void main(String[] args) {
+        Random random = new Random();
+        System.out.println("[-100, 100]:");
+        for (BinScheme scheme : Arrays.asList(new ConstantBinScheme(1000, -100, 100),
+                                              new ConstantBinScheme(100, -100, 100),
+                                              new ConstantBinScheme(10, -100, 100))) {
+            Histogram h = new Histogram(scheme);
+            for (int i = 0; i < 10000; i++)
+                h.record(200.0 * random.nextDouble() - 100.0);
+            for (double quantile = 0.0; quantile < 1.0; quantile += 0.05)
+                System.out.printf("%5.2f: %.1f, ", quantile, h.value(quantile));
+            System.out.println();
+        }
+
+        System.out.println("[0, 1000]");
+        for (BinScheme scheme : Arrays.asList(new LinearBinScheme(1000, 1000),
+                                              new LinearBinScheme(100, 1000),
+                                              new LinearBinScheme(10, 1000))) {
+            Histogram h = new Histogram(scheme);
+            for (int i = 0; i < 10000; i++)
+                h.record(1000.0 * random.nextDouble());
+            for (double quantile = 0.0; quantile < 1.0; quantile += 0.05)
+                System.out.printf("%5.2f: %.1f, ", quantile, h.value(quantile));
+            System.out.println();
+        }
+    }
+
 }


Mime
View raw message