kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: MINOR: Set `sendTime` in `doSend` instead of `InFlightRequests.add` and rename method names for consistency
Date Thu, 01 Oct 2015 23:30:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5e769ccd5 -> d03b871dd


MINOR: Set `sendTime` in `doSend` instead of `InFlightRequests.add` and rename method names
for consistency

hachikuji MayureshGharat jjkoshy Thoughts?

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Mayuresh Gharat <gharatmayuresh15@gmail.com>, Jason Gustafson <jason@confluent.io>,
Joel Koshy <jjkoshy.w@gmail.com>

Closes #264 from ijuma/tweak-send-ms


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d03b871d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d03b871d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d03b871d

Branch: refs/heads/trunk
Commit: d03b871dd0c395d372e6a7d1ff027105380f588b
Parents: 5e769cc
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Oct 1 16:30:31 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Oct 1 16:30:31 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientRequest.java | 32 +++++++++++---------
 .../apache/kafka/clients/ClientResponse.java    | 16 +++++-----
 .../apache/kafka/clients/InFlightRequests.java  |  9 +++---
 .../org/apache/kafka/clients/NetworkClient.java |  3 +-
 .../org/apache/kafka/clients/MockClient.java    |  2 +-
 5 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d03b871d/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 6410f09..117b0bf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -19,35 +19,35 @@ import org.apache.kafka.common.requests.RequestSend;
  */
 public final class ClientRequest {
 
-    private final long createdMs;
+    private final long createdTimeMs;
     private final boolean expectResponse;
     private final RequestSend request;
     private final RequestCompletionHandler callback;
     private final boolean isInitiatedByNetworkClient;
-    private long sendMs;
+    private long sendTimeMs;
 
     /**
-     * @param createdMs The unix timestamp in milliseconds for the time at which this request
was created.
+     * @param createdTimeMs The unix timestamp in milliseconds for the time at which this
request was created.
      * @param expectResponse Should we expect a response message or is this request complete
once it is sent?
      * @param request The request
      * @param callback A callback to execute when the response has been received (or null
if no callback is necessary)
      */
-    public ClientRequest(long createdMs, boolean expectResponse, RequestSend request,
+    public ClientRequest(long createdTimeMs, boolean expectResponse, RequestSend request,
                          RequestCompletionHandler callback) {
-        this(createdMs, expectResponse, request, callback, false);
+        this(createdTimeMs, expectResponse, request, callback, false);
     }
 
     /**
-     * @param createdMs The unix timestamp in milliseconds for the time at which this request
was created.
+     * @param createdTimeMs The unix timestamp in milliseconds for the time at which this
request was created.
      * @param expectResponse Should we expect a response message or is this request complete
once it is sent?
      * @param request The request
      * @param callback A callback to execute when the response has been received (or null
if no callback is necessary)
      * @param isInitiatedByNetworkClient Is request initiated by network client, if yes,
its
      *                                   response will be consumed by network client
      */
-    public ClientRequest(long createdMs, boolean expectResponse, RequestSend request,
+    public ClientRequest(long createdTimeMs, boolean expectResponse, RequestSend request,
                          RequestCompletionHandler callback, boolean isInitiatedByNetworkClient)
{
-        this.createdMs = createdMs;
+        this.createdTimeMs = createdTimeMs;
         this.callback = callback;
         this.request = request;
         this.expectResponse = expectResponse;
@@ -60,6 +60,8 @@ public final class ClientRequest {
             ", callback=" + callback +
             ", request=" + request +
             (isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient" : "") +
+            ", createdTimeMs=" + createdTimeMs +
+            ", sendTimeMs=" + sendTimeMs +
             ")";
     }
 
@@ -79,19 +81,19 @@ public final class ClientRequest {
         return callback;
     }
 
-    public long createdTime() {
-        return createdMs;
+    public long createdTimeMs() {
+        return createdTimeMs;
     }
 
     public boolean isInitiatedByNetworkClient() {
         return isInitiatedByNetworkClient;
     }
 
-    public long getSendMs() {
-        return sendMs;
+    public long sendTimeMs() {
+        return sendTimeMs;
     }
 
-    public void setSendMs(long sendMs) {
-        this.sendMs = sendMs;
+    public void setSendTimeMs(long sendTimeMs) {
+        this.sendTimeMs = sendTimeMs;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d03b871d/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
index 14ef69a..3b6f955 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
@@ -20,27 +20,27 @@ import org.apache.kafka.common.protocol.types.Struct;
  */
 public class ClientResponse {
 
-    private final long received;
+    private final long receivedTimeMs;
     private final boolean disconnected;
     private final ClientRequest request;
     private final Struct responseBody;
 
     /**
      * @param request The original request
-     * @param received The unix timestamp when this response was received
+     * @param receivedTimeMs The unix timestamp when this response was received
      * @param disconnected Whether the client disconnected before fully reading a response
      * @param responseBody The response contents (or null) if we disconnected or no response
was expected
      */
-    public ClientResponse(ClientRequest request, long received, boolean disconnected, Struct
responseBody) {
+    public ClientResponse(ClientRequest request, long receivedTimeMs, boolean disconnected,
Struct responseBody) {
         super();
-        this.received = received;
+        this.receivedTimeMs = receivedTimeMs;
         this.disconnected = disconnected;
         this.request = request;
         this.responseBody = responseBody;
     }
 
-    public long receivedTime() {
-        return received;
+    public long receivedTimeMs() {
+        return receivedTimeMs;
     }
 
     public boolean wasDisconnected() {
@@ -60,12 +60,12 @@ public class ClientResponse {
     }
 
     public long requestLatencyMs() {
-        return receivedTime() - this.request.createdTime();
+        return receivedTimeMs() - this.request.createdTimeMs();
     }
 
     @Override
     public String toString() {
-        return "ClientResponse(received=" + received +
+        return "ClientResponse(receivedTimeMs=" + receivedTimeMs +
                ", disconnected=" +
                disconnected +
                ", request=" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/d03b871d/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index f9956af..8de19ee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -35,13 +35,12 @@ final class InFlightRequests {
     /**
      * Add the given request to the queue for the connection it was directed to
      */
-    public void add(ClientRequest request, long now) {
+    public void add(ClientRequest request) {
         Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
         if (reqs == null) {
-            reqs = new ArrayDeque<ClientRequest>();
+            reqs = new ArrayDeque<>();
             this.requests.put(request.request().destination(), reqs);
         }
-        request.setSendMs(now);
         reqs.addFirst(request);
     }
 
@@ -138,7 +137,7 @@ final class InFlightRequests {
         for (String nodeId : requests.keySet()) {
             if (inFlightRequestCount(nodeId) > 0) {
                 ClientRequest request = requests.get(nodeId).peekLast();
-                long timeSinceSend = now - request.getSendMs();
+                long timeSinceSend = now - request.sendTimeMs();
                 if (timeSinceSend > requestTimeout) {
                     nodeIds.add(nodeId);
                 }
@@ -147,4 +146,4 @@ final class InFlightRequests {
 
         return nodeIds;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d03b871d/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 8475a76..0275daf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -238,7 +238,8 @@ public class NetworkClient implements KafkaClient {
     }
 
     private void doSend(ClientRequest request, long now) {
-        this.inFlightRequests.add(request, now);
+        request.setSendTimeMs(now);
+        this.inFlightRequests.add(request);
         selector.send(request.request());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d03b871d/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index ee72328..4dfdd2a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -109,7 +109,7 @@ public class MockClient implements KafkaClient {
             ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected,
futureResp.responseBody);
             responses.add(resp);
         } else {
-            request.setSendMs(now);
+            request.setSendTimeMs(now);
             this.requests.add(request);
         }
     }


Mime
View raw message