kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1359; Followup on K1359: change nanoTime to currentTimeMillis in metrics; patched by Guozhang Wang; reviewed by Neha Narkhede, Jun Rao
Date Wed, 23 Apr 2014 00:48:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk caafc9d61 -> 93af67cd4


kafka-1359; Followup on K1359: change nanoTime to currentTimeMillis in metrics; patched by Guozhang Wang; reviewed by Neha Narkhede, Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/93af67cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/93af67cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/93af67cd

Branch: refs/heads/trunk
Commit: 93af67cd4f75d2f4cc3a25ad999fe6349925504e
Parents: caafc9d
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Tue Apr 22 17:47:55 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Apr 22 17:47:55 2014 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/Metadata.java    |  14 +-
 .../producer/internals/RecordAccumulator.java   |  26 ++--
 .../clients/producer/internals/RecordBatch.java |  12 +-
 .../clients/producer/internals/Sender.java      | 133 ++++++++++---------
 .../kafka/common/metrics/KafkaMetric.java       |   6 +-
 .../apache/kafka/common/metrics/Measurable.java |   4 +-
 .../kafka/common/metrics/MetricConfig.java      |  10 +-
 .../org/apache/kafka/common/metrics/Sensor.java |  18 +--
 .../org/apache/kafka/common/metrics/Stat.java   |   4 +-
 .../apache/kafka/common/metrics/stats/Avg.java  |   4 +-
 .../kafka/common/metrics/stats/Count.java       |   4 +-
 .../apache/kafka/common/metrics/stats/Max.java  |   4 +-
 .../apache/kafka/common/metrics/stats/Min.java  |   4 +-
 .../kafka/common/metrics/stats/Percentiles.java |  18 +--
 .../apache/kafka/common/metrics/stats/Rate.java |  28 ++--
 .../kafka/common/metrics/stats/SampledStat.java |  60 ++++-----
 .../kafka/common/metrics/stats/Total.java       |   4 +-
 .../apache/kafka/common/network/Selector.java   |  16 ++-
 .../apache/kafka/common/utils/SystemTime.java   |   1 +
 .../kafka/common/metrics/MetricsTest.java       |  26 ++--
 .../org/apache/kafka/test/Microbenchmarks.java  |   4 +-
 21 files changed, 202 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 33d62a4..f114ffd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -36,7 +36,7 @@ public final class Metadata {
 
     private final long refreshBackoffMs;
     private final long metadataExpireMs;
-    private long lastRefresh;
+    private long lastRefreshMs;
     private Cluster cluster;
     private boolean forceUpdate;
     private final Set<String> topics;
@@ -57,7 +57,7 @@ public final class Metadata {
     public Metadata(long refreshBackoffMs, long metadataExpireMs) {
         this.refreshBackoffMs = refreshBackoffMs;
         this.metadataExpireMs = metadataExpireMs;
-        this.lastRefresh = 0L;
+        this.lastRefreshMs = 0L;
         this.cluster = Cluster.empty();
         this.forceUpdate = false;
         this.topics = new HashSet<String>();
@@ -105,8 +105,8 @@ public final class Metadata {
      * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more
      * than metadataExpireMs has passed since the last refresh)
      */
-    public synchronized boolean needsUpdate(long now) {
-        long msSinceLastUpdate = now - this.lastRefresh;
+    public synchronized boolean needsUpdate(long nowMs) {
+        long msSinceLastUpdate = nowMs - this.lastRefreshMs;
         boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs;
         boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs;
         return updateAllowed && updateNeeded;
@@ -129,9 +129,9 @@ public final class Metadata {
     /**
      * Update the cluster metadata
      */
-    public synchronized void update(Cluster cluster, long now) {
+    public synchronized void update(Cluster cluster, long nowMs) {
         this.forceUpdate = false;
-        this.lastRefresh = now;
+        this.lastRefreshMs = nowMs;
         this.cluster = cluster;
         notifyAll();
         log.debug("Updated cluster metadata to {}", cluster);
@@ -141,7 +141,7 @@ public final class Metadata {
      * The last time metadata was updated.
      */
     public synchronized long lastUpdate() {
-        return this.lastRefresh;
+        return this.lastRefreshMs;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index ffd13ff..2d7e52d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -93,27 +93,27 @@ public final class RecordAccumulator {
         metrics.addMetric("waiting-threads",
                           "The number of user threads blocked waiting for buffer memory to enqueue their records",
                           new Measurable() {
-                              public double measure(MetricConfig config, long now) {
+                              public double measure(MetricConfig config, long nowMs) {
                                   return free.queued();
                               }
                           });
         metrics.addMetric("buffer-total-bytes",
                           "The maximum amount of buffer memory the client can use (whether or not it is currently used).",
                           new Measurable() {
-                              public double measure(MetricConfig config, long now) {
+                              public double measure(MetricConfig config, long nowMs) {
                                   return free.totalMemory();
                               }
                           });
         metrics.addMetric("buffer-available-bytes",
                           "The total amount of buffer memory that is not being used (either unallocated or in the free list).",
                           new Measurable() {
-                              public double measure(MetricConfig config, long now) {
+                              public double measure(MetricConfig config, long nowMs) {
                                   return free.availableMemory();
                               }
                           });
         metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() {
-            public double measure(MetricConfig config, long now) {
-                return ready(now).size();
+            public double measure(MetricConfig config, long nowMs) {
+                return ready(nowMs).size();
             }
         });
     }
@@ -170,9 +170,9 @@ public final class RecordAccumulator {
     /**
      * Re-enqueue the given record batch in the accumulator to retry
      */
-    public void reenqueue(RecordBatch batch, long now) {
+    public void reenqueue(RecordBatch batch, long nowMs) {
         batch.attempts++;
-        batch.lastAttempt = now;
+        batch.lastAttemptMs = nowMs;
         Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
         synchronized (deque) {
             deque.addFirst(batch);
@@ -191,7 +191,7 @@ public final class RecordAccumulator {
      * <li>The accumulator has been closed
      * </ol>
      */
-    public List<TopicPartition> ready(long now) {
+    public List<TopicPartition> ready(long nowMs) {
         List<TopicPartition> ready = new ArrayList<TopicPartition>();
         boolean exhausted = this.free.queued() > 0;
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
@@ -199,9 +199,9 @@ public final class RecordAccumulator {
             synchronized (deque) {
                 RecordBatch batch = deque.peekFirst();
                 if (batch != null) {
-                    boolean backingOff = batch.attempts > 0 && batch.lastAttempt + retryBackoffMs > now;
+                    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                     boolean full = deque.size() > 1 || batch.records.isFull();
-                    boolean expired = now - batch.created >= lingerMs;
+                    boolean expired = nowMs - batch.createdMs >= lingerMs;
                     boolean sendable = full || expired || exhausted || closed;
                     if (sendable && !backingOff)
                         ready.add(batch.topicPartition);
@@ -231,11 +231,11 @@ public final class RecordAccumulator {
      * 
      * @param partitions The list of partitions to drain
      * @param maxSize The maximum number of bytes to drain
-     * @param now The current unix time
+     * @param nowMs The current unix time in milliseconds
      * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize.
      *         TODO: There may be a starvation issue due to iteration order
      */
-    public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize, long now) {
+    public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize, long nowMs) {
         if (partitions.isEmpty())
             return Collections.emptyList();
         int size = 0;
@@ -258,7 +258,7 @@ public final class RecordAccumulator {
                         batch.records.close();
                         size += batch.records.sizeInBytes();
                         ready.add(batch);
-                        batch.drained = now;
+                        batch.drainedMs = nowMs;
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 94157f7..5ee5455 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -34,17 +34,17 @@ public final class RecordBatch {
     public int recordCount = 0;
     public int maxRecordSize = 0;
     public volatile int attempts = 0;
-    public final long created;
-    public long drained;
-    public long lastAttempt;
+    public final long createdMs;
+    public long drainedMs;
+    public long lastAttemptMs;
     public final MemoryRecords records;
     public final TopicPartition topicPartition;
     private final ProduceRequestResult produceFuture;
     private final List<Thunk> thunks;
 
-    public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
-        this.created = now;
-        this.lastAttempt = now;
+    public RecordBatch(TopicPartition tp, MemoryRecords records, long nowMs) {
+        this.createdMs = nowMs;
+        this.lastAttemptMs = nowMs;
         this.records = records;
         this.topicPartition = tp;
         this.produceFuture = new ProduceRequestResult();

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 9f2b2e9..f0152fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -188,23 +187,23 @@ public class Sender implements Runnable {
     /**
      * Run a single iteration of sending
      * 
-     * @param now The current time
+     * @param nowMs The current POSIX time in milliseconds
      */
-    public void run(long now) {
+    public void run(long nowMs) {
         Cluster cluster = metadata.fetch();
         // get the list of partitions with data ready to send
-        List<TopicPartition> ready = this.accumulator.ready(now);
+        List<TopicPartition> ready = this.accumulator.ready(nowMs);
 
         // should we update our metadata?
         List<NetworkSend> sends = new ArrayList<NetworkSend>();
-        maybeUpdateMetadata(cluster, sends, now);
+        maybeUpdateMetadata(cluster, sends, nowMs);
 
         // prune the list of ready topics to eliminate any that we aren't ready to send yet
-        List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
+        List<TopicPartition> sendable = processReadyPartitions(cluster, ready, nowMs);
 
         // create produce requests
-        List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize, now);
-        List<InFlightRequest> requests = collate(cluster, batches, now);
+        List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize, nowMs);
+        List<InFlightRequest> requests = collate(cluster, batches, nowMs);
         sensors.updateProduceRequestMetrics(requests);
 
         if (ready.size() > 0) {
@@ -228,16 +227,16 @@ public class Sender implements Runnable {
 
         // handle responses, connections, and disconnections
         handleSends(this.selector.completedSends());
-        handleResponses(this.selector.completedReceives(), now);
-        handleDisconnects(this.selector.disconnected(), now);
+        handleResponses(this.selector.completedReceives(), nowMs);
+        handleDisconnects(this.selector.disconnected(), nowMs);
         handleConnects(this.selector.connected());
     }
 
     /**
      * Add a metadata request to the list of sends if we need to make one
      */
-    private void maybeUpdateMetadata(Cluster cluster, List<NetworkSend> sends, long now) {
-        if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
+    private void maybeUpdateMetadata(Cluster cluster, List<NetworkSend> sends, long nowMs) {
+        if (this.metadataFetchInProgress || !metadata.needsUpdate(nowMs))
             return;
 
         Node node = selectMetadataDestination(cluster);
@@ -247,13 +246,13 @@ public class Sender implements Runnable {
         if (nodeStates.isConnected(node.id())) {
             Set<String> topics = metadata.topics();
             this.metadataFetchInProgress = true;
-            InFlightRequest metadataRequest = metadataRequest(now, node.id(), topics);
+            InFlightRequest metadataRequest = metadataRequest(nowMs, node.id(), topics);
             log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
             sends.add(metadataRequest.request);
             this.inFlightRequests.add(metadataRequest);
-        } else if (nodeStates.canConnect(node.id(), now)) {
+        } else if (nodeStates.canConnect(node.id(), nowMs)) {
             // we don't have a connection to this node right now, make one
-            initiateConnect(node, now);
+            initiateConnect(node, nowMs);
         }
     }
 
@@ -315,7 +314,7 @@ public class Sender implements Runnable {
      * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate
      * metadata to be able to do so
      */
-    private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long now) {
+    private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long nowMs) {
         List<TopicPartition> sendable = new ArrayList<TopicPartition>(ready.size());
         for (TopicPartition tp : ready) {
             Node node = cluster.leaderFor(tp);
@@ -324,9 +323,9 @@ public class Sender implements Runnable {
                 metadata.forceUpdate();
             } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
                 sendable.add(tp);
-            } else if (nodeStates.canConnect(node.id(), now)) {
+            } else if (nodeStates.canConnect(node.id(), nowMs)) {
                 // we don't have a connection to this node right now, make one
-                initiateConnect(node, now);
+                initiateConnect(node, nowMs);
             }
         }
         return sendable;
@@ -335,11 +334,11 @@ public class Sender implements Runnable {
     /**
      * Initiate a connection to the given node
      */
-    private void initiateConnect(Node node, long now) {
+    private void initiateConnect(Node node, long nowMs) {
         try {
             log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
             selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
-            this.nodeStates.connecting(node.id(), now);
+            this.nodeStates.connecting(node.id(), nowMs);
         } catch (IOException e) {
             /* attempt failed, we'll try again after the backoff */
             nodeStates.disconnected(node.id());
@@ -352,7 +351,7 @@ public class Sender implements Runnable {
     /**
      * Handle any closed connections
      */
-    private void handleDisconnects(List<Integer> disconnects, long now) {
+    private void handleDisconnects(List<Integer> disconnects, long nowMs) {
         // clear out the in-flight requests for the disconnected broker
         for (int node : disconnects) {
             nodeStates.disconnected(node);
@@ -364,7 +363,7 @@ public class Sender implements Runnable {
                     case PRODUCE:
                         int correlation = request.request.header().correlationId();
                         for (RecordBatch batch : request.batches.values())
-                            completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now);
+                            completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, nowMs);
                         break;
                     case METADATA:
                         metadataFetchInProgress = false;
@@ -413,8 +412,7 @@ public class Sender implements Runnable {
     /**
      * Handle responses from the server
      */
-    private void handleResponses(List<NetworkReceive> receives, long now) {
-        long ns = time.nanoseconds();
+    private void handleResponses(List<NetworkReceive> receives, long nowMs) {
         for (NetworkReceive receive : receives) {
             int source = receive.source();
             InFlightRequest req = inFlightRequests.nextCompleted(source);
@@ -424,27 +422,27 @@ public class Sender implements Runnable {
             correlate(req.request.header(), header);
             if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) {
                 log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId());
-                handleProduceResponse(req, req.request.header(), body, now);
+                handleProduceResponse(req, req.request.header(), body, nowMs);
             } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) {
                 log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header()
                                                                                                                         .correlationId());
-                handleMetadataResponse(req.request.header(), body, now);
+                handleMetadataResponse(req.request.header(), body, nowMs);
             } else {
                 throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
             }
-            this.sensors.recordLatency(receive.source(), now - req.created, ns);
+            this.sensors.recordLatency(receive.source(), nowMs - req.createdMs);
         }
 
     }
 
-    private void handleMetadataResponse(RequestHeader header, Struct body, long now) {
+    private void handleMetadataResponse(RequestHeader header, Struct body, long nowMs) {
         this.metadataFetchInProgress = false;
         MetadataResponse response = new MetadataResponse(body);
         Cluster cluster = response.cluster();
         // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
         // created which means we will get errors and no nodes until it exists
         if (cluster.nodes().size() > 0)
-            this.metadata.update(cluster, now);
+            this.metadata.update(cluster, nowMs);
         else
             log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
     }
@@ -452,7 +450,7 @@ public class Sender implements Runnable {
     /**
      * Handle a produce response
      */
-    private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long now) {
+    private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long nowMs) {
         ProduceResponse pr = new ProduceResponse(body);
         for (Map<TopicPartition, ProduceResponse.PartitionResponse> responses : pr.responses().values()) {
             for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : responses.entrySet()) {
@@ -462,7 +460,7 @@ public class Sender implements Runnable {
                 if (error.exception() instanceof InvalidMetadataException)
                     metadata.forceUpdate();
                 RecordBatch batch = request.batches.get(tp);
-                completeBatch(batch, error, response.baseOffset, header.correlationId(), now);
+                completeBatch(batch, error, response.baseOffset, header.correlationId(), nowMs);
             }
         }
     }
@@ -473,9 +471,9 @@ public class Sender implements Runnable {
      * @param error The error (or null if none)
      * @param baseOffset The base offset assigned to the records if successful
      * @param correlationId The correlation id for the request
-     * @param now The current time stamp
+     * @param nowMs The current POSIX time stamp in milliseconds
      */
-    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) {
+    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long nowMs) {
         if (error != Errors.NONE && canRetry(batch, error)) {
             // retry
             log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
@@ -483,7 +481,7 @@ public class Sender implements Runnable {
                      batch.topicPartition,
                      this.retries - batch.attempts - 1,
                      error);
-            this.accumulator.reenqueue(batch, now);
+            this.accumulator.reenqueue(batch, nowMs);
             this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
         } else {
             // tell the user the result of their request
@@ -515,16 +513,16 @@ public class Sender implements Runnable {
     /**
      * Create a metadata request for the given topics
      */
-    private InFlightRequest metadataRequest(long now, int node, Set<String> topics) {
+    private InFlightRequest metadataRequest(long nowMs, int node, Set<String> topics) {
         MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
         RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct());
-        return new InFlightRequest(now, true, send, null);
+        return new InFlightRequest(nowMs, true, send, null);
     }
 
     /**
      * Collate the record batches into a list of produce requests on a per-node basis
      */
-    private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches, long now) {
+    private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches, long nowMs) {
         Map<Integer, List<RecordBatch>> collated = new HashMap<Integer, List<RecordBatch>>();
         for (RecordBatch batch : batches) {
             Node node = cluster.leaderFor(batch.topicPartition);
@@ -537,14 +535,14 @@ public class Sender implements Runnable {
         }
         List<InFlightRequest> requests = new ArrayList<InFlightRequest>(collated.size());
         for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
-            requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
+            requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue()));
         return requests;
     }
 
     /**
      * Create a produce request from the given record batches
      */
-    private InFlightRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
+    private InFlightRequest produceRequest(long nowMs, int destination, short acks, int timeout, List<RecordBatch> batches) {
         Map<TopicPartition, RecordBatch> batchesByPartition = new HashMap<TopicPartition, RecordBatch>();
         Map<String, List<RecordBatch>> batchesByTopic = new HashMap<String, List<RecordBatch>>();
         for (RecordBatch batch : batches) {
@@ -579,7 +577,7 @@ public class Sender implements Runnable {
         produce.set("topic_data", topicDatas.toArray());
 
         RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce);
-        return new InFlightRequest(now, acks != 0, send, batchesByPartition);
+        return new InFlightRequest(nowMs, acks != 0, send, batchesByPartition);
     }
 
     private RequestHeader header(ApiKeys key) {
@@ -605,15 +603,15 @@ public class Sender implements Runnable {
      */
     private static final class NodeState {
         private ConnectionState state;
-        private long lastConnectAttempt;
+        private long lastConnectAttemptMs;
 
         public NodeState(ConnectionState state, long lastConnectAttempt) {
             this.state = state;
-            this.lastConnectAttempt = lastConnectAttempt;
+            this.lastConnectAttemptMs = lastConnectAttempt;
         }
 
         public String toString() {
-            return "NodeState(" + state + ", " + lastConnectAttempt + ")";
+            return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
         }
     }
 
@@ -626,16 +624,16 @@ public class Sender implements Runnable {
             this.nodeState = new HashMap<Integer, NodeState>();
         }
 
-        public boolean canConnect(int node, long now) {
+        public boolean canConnect(int node, long nowMs) {
             NodeState state = nodeState.get(node);
             if (state == null)
                 return true;
             else
-                return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs;
+                return state.state == ConnectionState.DISCONNECTED && nowMs - state.lastConnectAttemptMs > this.reconnectBackoffMs;
         }
 
-        public void connecting(int node, long now) {
-            nodeState.put(node, new NodeState(ConnectionState.CONNECTING, now));
+        public void connecting(int node, long nowMs) {
+            nodeState.put(node, new NodeState(ConnectionState.CONNECTING, nowMs));
         }
 
         public boolean isConnected(int node) {
@@ -668,19 +666,19 @@ public class Sender implements Runnable {
      * An request that hasn't been fully processed yet
      */
     private static final class InFlightRequest {
-        public long created;
+        public long createdMs;
         public boolean expectResponse;
         public Map<TopicPartition, RecordBatch> batches;
         public RequestSend request;
 
         /**
-         * @param created The unix timestamp for the time at which this request was created.
+         * @param createdMs The unix timestamp in milliseonds for the time at which this request was created.
          * @param expectResponse Should we expect a response message or is this request complete once it is sent?
          * @param request The request
          * @param batches The record batches contained in the request if it is a produce request
          */
-        public InFlightRequest(long created, boolean expectResponse, RequestSend request, Map<TopicPartition, RecordBatch> batches) {
-            this.created = created;
+        public InFlightRequest(long createdMs, boolean expectResponse, RequestSend request, Map<TopicPartition, RecordBatch> batches) {
+            this.createdMs = createdMs;
             this.batches = batches;
             this.request = request;
             this.expectResponse = expectResponse;
@@ -804,13 +802,13 @@ public class Sender implements Runnable {
             this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max());
 
             this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() {
-                public double measure(MetricConfig config, long now) {
+                public double measure(MetricConfig config, long nowMs) {
                     return inFlightRequests.totalInFlightRequests();
                 }
             });
             metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return (TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS) - metadata.lastUpdate()) / 1000.0;
+                public double measure(MetricConfig config, long nowMs) {
+                    return (nowMs - metadata.lastUpdate()) / 1000.0;
                 }
             });
         }
@@ -839,7 +837,7 @@ public class Sender implements Runnable {
         }
 
         public void updateProduceRequestMetrics(List<InFlightRequest> requests) {
-            long ns = time.nanoseconds();
+            long nowMs = time.milliseconds();
             for (int i = 0; i < requests.size(); i++) {
                 InFlightRequest request = requests.get(i);
                 int records = 0;
@@ -862,36 +860,39 @@ public class Sender implements Runnable {
                         topicByteRate.record(batch.records.sizeInBytes());
 
                         // global metrics
-                        this.batchSizeSensor.record(batch.records.sizeInBytes(), ns);
-                        this.queueTimeSensor.record(batch.drained - batch.created, ns);
-                        this.maxRecordSizeSensor.record(batch.maxRecordSize, ns);
+                        this.batchSizeSensor.record(batch.records.sizeInBytes(), nowMs);
+                        this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, nowMs);
+                        this.maxRecordSizeSensor.record(batch.maxRecordSize, nowMs);
                         records += batch.recordCount;
                     }
-                    this.recordsPerRequestSensor.record(records, ns);
+                    this.recordsPerRequestSensor.record(records, nowMs);
                 }
             }
         }
 
         public void recordRetries(String topic, int count) {
-            this.retrySensor.record(count);
+            long nowMs = time.milliseconds();
+            this.retrySensor.record(count, nowMs);
             String topicRetryName = "topic." + topic + ".record-retries";
             Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName);
-            if (topicRetrySensor != null) topicRetrySensor.record(count);
+            if (topicRetrySensor != null) topicRetrySensor.record(count, nowMs);
         }
 
         public void recordErrors(String topic, int count) {
-            this.errorSensor.record(count);
+            long nowMs = time.milliseconds();
+            this.errorSensor.record(count, nowMs);
             String topicErrorName = "topic." + topic + ".record-errors";
             Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
-            if (topicErrorSensor != null) topicErrorSensor.record(count);
+            if (topicErrorSensor != null) topicErrorSensor.record(count, nowMs);
         }
 
-        public void recordLatency(int node, long latency, long nowNs) {
-            this.requestTimeSensor.record(latency, nowNs);
+        public void recordLatency(int node, long latency) {
+            long nowMs = time.milliseconds();
+            this.requestTimeSensor.record(latency, nowMs);
             if (node >= 0) {
                 String nodeTimeName = "node-" + node + ".latency";
                 Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
-                if (nodeRequestTime != null) nodeRequestTime.record(latency, nowNs);
+                if (nodeRequestTime != null) nodeRequestTime.record(latency, nowMs);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index b2426ac..a7458b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -55,12 +55,12 @@ public final class KafkaMetric implements Metric {
     @Override
     public double value() {
         synchronized (this.lock) {
-            return value(time.nanoseconds());
+            return value(time.milliseconds());
         }
     }
 
-    double value(long time) {
-        return this.measurable.measure(config, time);
+    double value(long timeMs) {
+        return this.measurable.measure(config, timeMs);
     }
 
     public void config(MetricConfig config) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
index 0f405c3..7c2e33c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java
@@ -24,9 +24,9 @@ public interface Measurable {
     /**
      * Measure this quantity and return the result as a double
      * @param config The configuration for this metric
-     * @param now The time the measurement is being taken
+     * @param nowMs The POSIX time in milliseconds the measurement is being taken
      * @return The measured value
      */
-    public double measure(MetricConfig config, long now);
+    public double measure(MetricConfig config, long nowMs);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
index 4d14fbc..dfa1b0a 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
@@ -26,7 +26,7 @@ public class MetricConfig {
     private Quota quota;
     private int samples;
     private long eventWindow;
-    private long timeWindowNs;
+    private long timeWindowMs;
     private TimeUnit unit;
 
     public MetricConfig() {
@@ -34,7 +34,7 @@ public class MetricConfig {
         this.quota = null;
         this.samples = 2;
         this.eventWindow = Long.MAX_VALUE;
-        this.timeWindowNs = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+        this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
         this.unit = TimeUnit.SECONDS;
     }
 
@@ -56,12 +56,12 @@ public class MetricConfig {
         return this;
     }
 
-    public long timeWindowNs() {
-        return timeWindowNs;
+    public long timeWindowMs() {
+        return timeWindowMs;
     }
 
     public MetricConfig timeWindow(long window, TimeUnit unit) {
-        this.timeWindowNs = TimeUnit.NANOSECONDS.convert(window, unit);
+        this.timeWindowMs = TimeUnit.MILLISECONDS.convert(window, unit);
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index d68349b..25c1faf 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -78,40 +78,40 @@ public final class Sensor {
      *         bound
      */
     public void record(double value) {
-        record(value, time.nanoseconds());
+        record(value, time.milliseconds());
     }
 
     /**
      * Record a value at a known time. This method is slightly faster than {@link #record(double)} since it will reuse
      * the time stamp.
      * @param value The value we are recording
-     * @param time The time in nanoseconds
+     * @param timeMs The current POSIX time in milliseconds
      * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
      *         bound
      */
-    public void record(double value, long time) {
+    public void record(double value, long timeMs) {
         synchronized (this) {
             // increment all the stats
             for (int i = 0; i < this.stats.size(); i++)
-                this.stats.get(i).record(config, value, time);
-            checkQuotas(time);
+                this.stats.get(i).record(config, value, timeMs);
+            checkQuotas(timeMs);
         }
         for (int i = 0; i < parents.length; i++)
-            parents[i].record(value, time);
+            parents[i].record(value, timeMs);
     }
 
     /**
      * Check if we have violated our quota for any metric that has a configured quota
-     * @param time
+     * @param timeMs
      */
-    private void checkQuotas(long time) {
+    private void checkQuotas(long timeMs) {
         for (int i = 0; i < this.metrics.size(); i++) {
             KafkaMetric metric = this.metrics.get(i);
             MetricConfig config = metric.config();
             if (config != null) {
                 Quota quota = config.quota();
                 if (quota != null) {
-                    if (!quota.acceptable(metric.value(time)))
+                    if (!quota.acceptable(metric.value(timeMs)))
                         throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound());
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java b/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java
index e02389c..0eb7ab2 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java
@@ -25,8 +25,8 @@ public interface Stat {
      * Record the given value
      * @param config The configuration to use for this metric
      * @param value The value to record
-     * @param time The time this value occurred
+     * @param timeMs The POSIX time in milliseconds this value occurred
      */
-    public void record(MetricConfig config, double value, long time);
+    public void record(MetricConfig config, double value, long timeMs);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
index 51725b2..c9963cb 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
@@ -31,12 +31,12 @@ public class Avg extends SampledStat {
     }
 
     @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
+    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
         sample.value += value;
     }
 
     @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
+    public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
         double total = 0.0;
         long count = 0;
         for (int i = 0; i < samples.size(); i++) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
index 3cdd1d0..efcd61b 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
@@ -31,12 +31,12 @@ public class Count extends SampledStat {
     }
 
     @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
+    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
         sample.value += 1.0;
     }
 
     @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
+    public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
         double total = 0.0;
         for (int i = 0; i < samples.size(); i++)
             total += samples.get(i).value;

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
index bba5972..c492c38 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
@@ -31,12 +31,12 @@ public final class Max extends SampledStat {
     }
 
     @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
+    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
         sample.value = Math.max(sample.value, value);
     }
 
     @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
+    public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
         double max = Double.NEGATIVE_INFINITY;
         for (int i = 0; i < samples.size(); i++)
             max = Math.max(max, samples.get(i).value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
index d370049..bd0919c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
@@ -31,12 +31,12 @@ public class Min extends SampledStat {
     }
 
     @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
+    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
         sample.value = Math.min(sample.value, value);
     }
 
     @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
+    public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
         double max = Double.MAX_VALUE;
         for (int i = 0; i < samples.size(); i++)
             max = Math.min(max, samples.get(i).value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
index b47ed88..8300978 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
@@ -65,16 +65,16 @@ public class Percentiles extends SampledStat implements CompoundStat {
         for (Percentile percentile : this.percentiles) {
             final double pct = percentile.percentile();
             ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return value(config, now, pct / 100.0);
+                public double measure(MetricConfig config, long nowMs) {
+                    return value(config, nowMs, pct / 100.0);
                 }
             }));
         }
         return ms;
     }
 
-    public double value(MetricConfig config, long now, double quantile) {
-        purgeObsoleteSamples(config, now);
+    public double value(MetricConfig config, long nowMs, double quantile) {
+        purgeObsoleteSamples(config, nowMs);
         float count = 0.0f;
         for (Sample sample : this.samples)
             count += sample.eventCount;
@@ -94,17 +94,17 @@ public class Percentiles extends SampledStat implements CompoundStat {
         return Double.POSITIVE_INFINITY;
     }
 
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        return value(config, now, 0.5);
+    public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
+        return value(config, nowMs, 0.5);
     }
 
     @Override
-    protected HistogramSample newSample(long now) {
-        return new HistogramSample(this.binScheme, now);
+    protected HistogramSample newSample(long timeMs) {
+        return new HistogramSample(this.binScheme, timeMs);
     }
 
     @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
+    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
         HistogramSample hist = (HistogramSample) sample;
         hist.histogram.record(value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index 7f5cc53..4b481a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -51,33 +51,33 @@ public class Rate implements MeasurableStat {
     }
 
     @Override
-    public void record(MetricConfig config, double value, long time) {
-        this.stat.record(config, value, time);
+    public void record(MetricConfig config, double value, long timeMs) {
+        this.stat.record(config, value, timeMs);
     }
 
     @Override
-    public double measure(MetricConfig config, long now) {
-        double value = stat.measure(config, now);
-        double elapsed = convert(now - stat.oldest(now).lastWindow);
+    public double measure(MetricConfig config, long nowMs) {
+        double value = stat.measure(config, nowMs);
+        double elapsed = convert(nowMs - stat.oldest(nowMs).lastWindowMs);
         return value / elapsed;
     }
 
     private double convert(long time) {
         switch (unit) {
             case NANOSECONDS:
-                return time;
+                return time * 1000.0 * 1000.0;
             case MICROSECONDS:
-                return time / 1000.0;
+                return time * 1000.0;
             case MILLISECONDS:
-                return time / (1000.0 * 1000.0);
+                return time;
             case SECONDS:
-                return time / (1000.0 * 1000.0 * 1000.0);
+                return time / (1000.0);
             case MINUTES:
-                return time / (60.0 * 1000.0 * 1000.0 * 1000.0);
+                return time / (60.0 * 1000.0);
             case HOURS:
-                return time / (60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0);
+                return time / (60.0 * 60.0 * 1000.0);
             case DAYS:
-                return time / (24.0 * 60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0);
+                return time / (24.0 * 60.0 * 60.0 * 1000.0);
             default:
                 throw new IllegalStateException("Unknown unit: " + unit);
         }
@@ -90,12 +90,12 @@ public class Rate implements MeasurableStat {
         }
 
         @Override
-        protected void update(Sample sample, MetricConfig config, double value, long now) {
+        protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
             sample.value += value;
         }
 
         @Override
-        public double combine(List<Sample> samples, MetricConfig config, long now) {
+        public double combine(List<Sample> samples, MetricConfig config, long nowMs) {
             double total = 0.0;
             for (int i = 0; i < samples.size(); i++)
                 total += samples.get(i).value;

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
index 776f3a1..0d4056f 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
@@ -40,90 +40,90 @@ public abstract class SampledStat implements MeasurableStat {
     }
 
     @Override
-    public void record(MetricConfig config, double value, long now) {
-        Sample sample = current(now);
-        if (sample.isComplete(now, config))
-            sample = advance(config, now);
-        update(sample, config, value, now);
+    public void record(MetricConfig config, double value, long timeMs) {
+        Sample sample = current(timeMs);
+        if (sample.isComplete(timeMs, config))
+            sample = advance(config, timeMs);
+        update(sample, config, value, timeMs);
         sample.eventCount += 1;
     }
 
-    private Sample advance(MetricConfig config, long now) {
+    private Sample advance(MetricConfig config, long timeMs) {
         this.current = (this.current + 1) % config.samples();
         if (this.current >= samples.size()) {
-            Sample sample = newSample(now);
+            Sample sample = newSample(timeMs);
             this.samples.add(sample);
             return sample;
         } else {
-            Sample sample = current(now);
-            sample.reset(now);
+            Sample sample = current(timeMs);
+            sample.reset(timeMs);
             return sample;
         }
     }
 
-    protected Sample newSample(long now) {
-        return new Sample(this.initialValue, now);
+    protected Sample newSample(long timeMs) {
+        return new Sample(this.initialValue, timeMs);
     }
 
     @Override
-    public double measure(MetricConfig config, long now) {
-        purgeObsoleteSamples(config, now);
-        return combine(this.samples, config, now);
+    public double measure(MetricConfig config, long nowMs) {
+        purgeObsoleteSamples(config, nowMs);
+        return combine(this.samples, config, nowMs);
     }
 
-    public Sample current(long now) {
+    public Sample current(long timeMs) {
         if (samples.size() == 0)
-            this.samples.add(newSample(now));
+            this.samples.add(newSample(timeMs));
         return this.samples.get(this.current);
     }
 
-    public Sample oldest(long now) {
+    public Sample oldest(long nowMs) {
         if (samples.size() == 0)
-            this.samples.add(newSample(now));
+            this.samples.add(newSample(nowMs));
         Sample oldest = this.samples.get(0);
         for (int i = 1; i < this.samples.size(); i++) {
             Sample curr = this.samples.get(i);
-            if (curr.lastWindow < oldest.lastWindow)
+            if (curr.lastWindowMs < oldest.lastWindowMs)
                 oldest = curr;
         }
         return oldest;
     }
 
-    protected abstract void update(Sample sample, MetricConfig config, double value, long now);
+    protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs);
 
-    public abstract double combine(List<Sample> samples, MetricConfig config, long now);
+    public abstract double combine(List<Sample> samples, MetricConfig config, long nowMs);
 
     /* Timeout any windows that have expired in the absence of any events */
-    protected void purgeObsoleteSamples(MetricConfig config, long now) {
-        long expireAge = config.samples() * config.timeWindowNs();
+    protected void purgeObsoleteSamples(MetricConfig config, long nowMs) {
+        long expireAge = config.samples() * config.timeWindowMs();
         for (int i = 0; i < samples.size(); i++) {
             Sample sample = this.samples.get(i);
-            if (now - sample.lastWindow >= expireAge)
-                sample.reset(now);
+            if (nowMs - sample.lastWindowMs >= expireAge)
+                sample.reset(nowMs);
         }
     }
 
     protected static class Sample {
         public double initialValue;
         public long eventCount;
-        public long lastWindow;
+        public long lastWindowMs;
         public double value;
 
         public Sample(double initialValue, long now) {
             this.initialValue = initialValue;
             this.eventCount = 0;
-            this.lastWindow = now;
+            this.lastWindowMs = now;
             this.value = initialValue;
         }
 
         public void reset(long now) {
             this.eventCount = 0;
-            this.lastWindow = now;
+            this.lastWindowMs = now;
             this.value = initialValue;
         }
 
-        public boolean isComplete(long now, MetricConfig config) {
-            return now - lastWindow >= config.timeWindowNs() || eventCount >= config.eventWindow();
+        public boolean isComplete(long timeMs, MetricConfig config) {
+            return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
index a9940ed..53dd3d5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
@@ -35,12 +35,12 @@ public class Total implements MeasurableStat {
     }
 
     @Override
-    public void record(MetricConfig config, double value, long time) {
+    public void record(MetricConfig config, double value, long timeMs) {
         this.total += value;
     }
 
     @Override
-    public double measure(MetricConfig config, long now) {
+    public double measure(MetricConfig config, long nowMs) {
         return this.total;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 6027cb2..3e35898 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -210,7 +210,7 @@ public class Selector implements Selectable {
         long startSelect = time.nanoseconds();
         int readyKeys = select(timeout);
         long endSelect = time.nanoseconds();
-        this.sensors.selectTime.record(endSelect - startSelect, endSelect);
+        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
 
         if (readyKeys > 0) {
             Set<SelectionKey> keys = this.selector.selectedKeys();
@@ -268,7 +268,7 @@ public class Selector implements Selectable {
             }
         }
         long endIo = time.nanoseconds();
-        this.sensors.ioTime.record(endIo - endSelect, endIo);
+        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
     }
 
     @Override
@@ -441,7 +441,7 @@ public class Selector implements Selectable {
             this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS));
 
             this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() {
-                public double measure(MetricConfig config, long now) {
+                public double measure(MetricConfig config, long nowMs) {
                     return keys.size();
                 }
             });
@@ -476,20 +476,22 @@ public class Selector implements Selectable {
         }
 
         public void recordBytesSent(int node, int bytes) {
-            this.bytesSent.record(bytes);
+            long nowMs = time.milliseconds();
+            this.bytesSent.record(bytes, nowMs);
             if (node >= 0) {
                 String nodeRequestName = "node-" + node + ".bytes-sent";
                 Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest != null) nodeRequest.record(bytes);
+                if (nodeRequest != null) nodeRequest.record(bytes, nowMs);
             }
         }
 
         public void recordBytesReceived(int node, int bytes) {
-            this.bytesReceived.record(bytes);
+            long nowMs = time.milliseconds();
+            this.bytesReceived.record(bytes, nowMs);
             if (node >= 0) {
                 String nodeRequestName = "node-" + node + ".bytes-received";
                 Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest != null) nodeRequest.record(bytes);
+                if (nodeRequest != null) nodeRequest.record(bytes, nowMs);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
index 6582c73..d682bd4 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
@@ -26,6 +26,7 @@ public class SystemTime implements Time {
         return System.currentTimeMillis();
     }
 
+    @Override
     public long nanoseconds() {
         return System.nanoTime();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 9ff73f4..e4e0a04 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -117,24 +117,24 @@ public class MetricsTest {
     public void testEventWindowing() {
         Count count = new Count();
         MetricConfig config = new MetricConfig().eventWindow(1).samples(2);
-        count.record(config, 1.0, time.nanoseconds());
-        count.record(config, 1.0, time.nanoseconds());
-        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
-        count.record(config, 1.0, time.nanoseconds()); // first event times out
-        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+        count.record(config, 1.0, time.milliseconds());
+        count.record(config, 1.0, time.milliseconds());
+        assertEquals(2.0, count.measure(config, time.milliseconds()), EPS);
+        count.record(config, 1.0, time.milliseconds()); // first event times out
+        assertEquals(2.0, count.measure(config, time.milliseconds()), EPS);
     }
 
     @Test
     public void testTimeWindowing() {
         Count count = new Count();
         MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS).samples(2);
-        count.record(config, 1.0, time.nanoseconds());
+        count.record(config, 1.0, time.milliseconds());
         time.sleep(1);
-        count.record(config, 1.0, time.nanoseconds());
-        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+        count.record(config, 1.0, time.milliseconds());
+        assertEquals(2.0, count.measure(config, time.milliseconds()), EPS);
         time.sleep(1);
-        count.record(config, 1.0, time.nanoseconds()); // oldest event times out
-        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
+        count.record(config, 1.0, time.milliseconds()); // oldest event times out
+        assertEquals(2.0, count.measure(config, time.milliseconds()), EPS);
     }
 
     @Test
@@ -143,9 +143,9 @@ public class MetricsTest {
         long windowMs = 100;
         int samples = 2;
         MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
-        max.record(config, 50, time.nanoseconds());
+        max.record(config, 50, time.milliseconds());
         time.sleep(samples * windowMs);
-        assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.nanoseconds()), EPS);
+        assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()), EPS);
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -213,7 +213,7 @@ public class MetricsTest {
         public double value = 0.0;
 
         @Override
-        public double measure(MetricConfig config, long now) {
+        public double measure(MetricConfig config, long nowMs) {
             return value;
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/93af67cd/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
index 6aab854..b24d4de 100644
--- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -84,7 +84,7 @@ public class Microbenchmarks {
                         counter++;
                     }
                 }
-                System.out.println("synchronized: " + ((System.nanoTime() - start) / iters));
+                System.out.println("synchronized: " + ((time.nanoseconds() - start) / iters));
                 System.out.println(counter);
                 done.set(true);
             }
@@ -121,7 +121,7 @@ public class Microbenchmarks {
                     counter++;
                     lock2.unlock();
                 }
-                System.out.println("lock: " + ((System.nanoTime() - start) / iters));
+                System.out.println("lock: " + ((time.nanoseconds() - start) / iters));
                 System.out.println(counter);
                 done.set(true);
             }


Mime
View raw message