kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-2101; Metric metadata-age is reset on a failed update; patched by Tim Brooks; reviewed by Jun Rao
Date Tue, 16 Jun 2015 00:44:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 719f2bddd -> 20a31a29f


kafka-2101; Metric metadata-age is reset on a failed update; patched by Tim Brooks; reviewed
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/20a31a29
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/20a31a29
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/20a31a29

Branch: refs/heads/trunk
Commit: 20a31a29f7aa6ce6687a13aa0cf60b92c5ac4d1e
Parents: 719f2bd
Author: Tim Brooks <tbrooks8@gmail.com>
Authored: Mon Jun 15 17:43:56 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Jun 15 17:43:56 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/kafka/clients/Metadata.java  | 11 +++++++----
 .../kafka/clients/producer/internals/Sender.java      |  2 +-
 .../java/org/apache/kafka/clients/MetadataTest.java   | 14 ++++++++++++++
 3 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/20a31a29/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 07f1cdb..0387f26 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -36,6 +36,7 @@ public final class Metadata {
     private final long metadataExpireMs;
     private int version;
     private long lastRefreshMs;
+    private long lastSuccessfulRefreshMs;
     private Cluster cluster;
     private boolean needUpdate;
     private final Set<String> topics;
@@ -57,6 +58,7 @@ public final class Metadata {
         this.refreshBackoffMs = refreshBackoffMs;
         this.metadataExpireMs = metadataExpireMs;
         this.lastRefreshMs = 0L;
+        this.lastSuccessfulRefreshMs = 0L;
         this.version = 0;
         this.cluster = Cluster.empty();
         this.needUpdate = false;
@@ -83,7 +85,7 @@ public final class Metadata {
      * is now
      */
     public synchronized long timeToNextUpdate(long nowMs) {
-        long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs
- nowMs, 0);
+        long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs
- nowMs, 0);
         long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
         return Math.max(timeToExpire, timeToAllowUpdate);
     }
@@ -146,6 +148,7 @@ public final class Metadata {
     public synchronized void update(Cluster cluster, long now) {
         this.needUpdate = false;
         this.lastRefreshMs = now;
+        this.lastSuccessfulRefreshMs = now;
         this.version += 1;
         this.cluster = cluster;
         notifyAll();
@@ -168,10 +171,10 @@ public final class Metadata {
     }
 
     /**
-     * The last time metadata was updated.
+     * The last time metadata was successfully updated.
      */
-    public synchronized long lastUpdate() {
-        return this.lastRefreshMs;
+    public synchronized long lastSuccessfulUpdate() {
+        return this.lastSuccessfulRefreshMs;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/20a31a29/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 07e65d4..0baf16e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -410,7 +410,7 @@ public class Sender implements Runnable {
             m = new MetricName("metadata-age", metricGrpName, "The age in seconds of the
current producer metadata being used.", metricTags);
             metrics.addMetric(m, new Measurable() {
                 public double measure(MetricConfig config, long now) {
-                    return (now - metadata.lastUpdate()) / 1000.0;
+                    return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/20a31a29/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 928087d..249d6b8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -85,6 +85,20 @@ public class MetadataTest {
         }
     }
 
+    @Test
+    public void testFailedUpdate() {
+        long time = 100;
+        metadata.update(Cluster.empty(), time);
+
+        assertEquals(100, metadata.timeToNextUpdate(1000));
+        metadata.failedUpdate(1100);
+
+        assertEquals(100, metadata.timeToNextUpdate(1100));
+        assertEquals(100, metadata.lastSuccessfulUpdate());
+
+    }
+
+
     private Thread asyncFetch(final String topic) {
         Thread thread = new Thread() {
             public void run() {


Mime
View raw message