kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5506; Fix NPE in OffsetFetchRequest.toString and logging improvements
Date Sat, 24 Jun 2017 07:21:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ee5eac715 -> a5c47db13


KAFKA-5506; Fix NPE in OffsetFetchRequest.toString and logging improvements

NetworkClient's logging improvements:
- Include correlation id in a number of log statements
- Avoid eager toString call in parameter passed to log.debug
- Use node.toString instead of passing a subset of fields to the
logger
- Use requestBuilder instead of clientRequest in one of the log
statements

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Damian Guy <damian.guy@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #3420 from ijuma/kafka-5506-offset-fetch-request-to-string-npe


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5c47db1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5c47db1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5c47db1

Branch: refs/heads/trunk
Commit: a5c47db1382c14720106ae1da20d2b332f89c22c
Parents: ee5eac7
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Sat Jun 24 08:20:44 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Jun 24 08:20:55 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java | 44 ++++++++++----------
 .../common/requests/OffsetFetchRequest.java     |  3 +-
 .../common/requests/RequestResponseTest.java    | 10 +++++
 3 files changed, 35 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a5c47db1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index af96575..59c606f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -251,7 +251,7 @@ public class NetworkClient implements KafkaClient {
         }
         connectionStates.disconnected(nodeId, now);
         if (log.isDebugEnabled()) {
-            log.debug("Manually disconnected from {}.  Removed requests: {}.", nodeId,
+            log.debug("Manually disconnected from {}. Removed requests: {}.", nodeId,
                 Utils.join(requestTypes, ", "));
         }
     }
@@ -360,8 +360,8 @@ public class NetworkClient implements KafkaClient {
             if (versionInfo == null) {
                 version = builder.desiredOrLatestVersion();
                 if (discoverBrokerVersions && log.isTraceEnabled())
-                    log.trace("No version information found when sending message of type
{} to node {}. " +
-                            "Assuming version {}.", clientRequest.apiKey(), nodeId, version);
+                    log.trace("No version information found when sending {} with correlation
id {} to node {}. " +
+                            "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(),
nodeId, version);
             } else {
                 version = versionInfo.usableVersion(clientRequest.apiKey(), builder.desiredVersion());
             }
@@ -371,8 +371,8 @@ public class NetworkClient implements KafkaClient {
         } catch (UnsupportedVersionException e) {
             // If the version is not supported, skip sending the request over the wire.
             // Instead, simply add it to the local queue of aborted requests.
-            log.debug("Version mismatch when attempting to send {} to {}",
-                    clientRequest.toString(), clientRequest.destination(), e);
+            log.debug("Version mismatch when attempting to send {} with correlation id {}
to {}", builder,
+                    clientRequest.correlationId(), clientRequest.destination(), e);
             ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.desiredOrLatestVersion()),
                     clientRequest.callback(), clientRequest.destination(), now, now,
                     false, e, null);
@@ -386,10 +386,11 @@ public class NetworkClient implements KafkaClient {
         if (log.isDebugEnabled()) {
             int latestClientVersion = clientRequest.apiKey().latestVersion();
             if (header.apiVersion() == latestClientVersion) {
-                log.trace("Sending {} {} to node {}.", clientRequest.apiKey(), request, nodeId);
+                log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(),
request,
+                        clientRequest.correlationId(), nodeId);
             } else {
-                log.debug("Using older server API v{} to send {} {} to node {}.",
-                        header.apiVersion(), clientRequest.apiKey(), request, nodeId);
+                log.debug("Using older server API v{} to send {} {} with correlation id {}
to node {}",
+                        header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(),
nodeId);
             }
         }
         Send send = request.toSend(nodeId, header);
@@ -554,8 +555,8 @@ public class NetworkClient implements KafkaClient {
     private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer,
RequestHeader requestHeader,
                                                                     Sensor throttleTimeSensor,
long now) {
         ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
-        // Always expect the response version id to be the same as the request version id
         ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
+        // Always expect the response version id to be the same as the request version id
         Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), responseBuffer);
         correlate(requestHeader, responseHeader);
         if (throttleTimeSensor != null && responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME))
@@ -591,7 +592,8 @@ public class NetworkClient implements KafkaClient {
                 break; // Disconnections in other states are logged at debug level in Selector
         }
         for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
-            log.trace("Cancelled request {} due to node {} being disconnected", request.request,
nodeId);
+            log.trace("Cancelled request {} with correlation id {} due to node {} being disconnected",
request.request,
+                    request.header.correlationId(), nodeId);
             if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id)
                 metadataUpdater.handleDisconnection(request.destination);
             else
@@ -655,8 +657,8 @@ public class NetworkClient implements KafkaClient {
             Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(),
req.header,
                 throttleTimeSensor, now);
             if (log.isTraceEnabled()) {
-                log.trace("Completed receive from node {}, for key {}, received {}", req.destination,
-                    req.header.apiKey(), responseStruct.toString());
+                log.trace("Completed receive from node {} for {} with correlation id {},
received {}", req.destination,
+                    ApiKeys.forId(req.header.apiKey()), req.header.correlationId(), responseStruct);
             }
             AbstractResponse body = createResponse(responseStruct, req.header);
             if (req.isInternalRequest && body instanceof MetadataResponse)
@@ -673,8 +675,8 @@ public class NetworkClient implements KafkaClient {
         final String node = req.destination;
         if (apiVersionsResponse.error() != Errors.NONE) {
             if (req.request.version() == 0 || apiVersionsResponse.error() != Errors.UNSUPPORTED_VERSION)
{
-                log.warn("Node {} got error {} when making an ApiVersionsRequest.  Disconnecting.",
-                        node, apiVersionsResponse.error());
+                log.warn("Received error {} from node {} when making an ApiVersionsRequest
with correlation id {}. Disconnecting.",
+                        apiVersionsResponse.error(), node, req.header.correlationId());
                 this.selector.close(node);
                 processDisconnection(responses, node, now, ChannelState.LOCAL_CLOSE);
             } else {
@@ -719,10 +721,10 @@ public class NetworkClient implements KafkaClient {
             if (discoverBrokerVersions) {
                 this.connectionStates.checkingApiVersions(node);
                 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
-                log.debug("Completed connection to node {}.  Fetching API versions.", node);
+                log.debug("Completed connection to node {}. Fetching API versions.", node);
             } else {
                 this.connectionStates.ready(node);
-                log.debug("Completed connection to node {}.  Ready.", node);
+                log.debug("Completed connection to node {}. Ready.", node);
             }
         }
     }
@@ -757,7 +759,7 @@ public class NetworkClient implements KafkaClient {
     private void initiateConnect(Node node, long now) {
         String nodeConnectionId = node.idString();
         try {
-            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(),
node.port());
+            log.debug("Initiating connection to node {}", node);
             this.connectionStates.connecting(nodeConnectionId, now);
             selector.connect(nodeConnectionId,
                              new InetSocketAddress(node.host(), node.port()),
@@ -768,7 +770,7 @@ public class NetworkClient implements KafkaClient {
             connectionStates.disconnected(nodeConnectionId, now);
             /* maybe the problem is our metadata, update it */
             metadataUpdater.requestUpdate();
-            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(),
e);
+            log.debug("Error connecting to node {}", node, e);
         }
     }
 
@@ -828,7 +830,7 @@ public class NetworkClient implements KafkaClient {
                 int nodeId = Integer.parseInt(destination);
                 Node node = cluster.nodeById(nodeId);
                 if (node != null)
-                    log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
+                    log.warn("Bootstrap broker {} disconnected", node);
             }
 
             metadataFetchInProgress = false;
@@ -886,7 +888,7 @@ public class NetworkClient implements KafkaClient {
                             metadata.allowAutoTopicCreation());
 
 
-                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
+                log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                 sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                 return requestTimeoutMs;
             }
@@ -902,7 +904,7 @@ public class NetworkClient implements KafkaClient {
 
             if (connectionStates.canConnect(nodeConnectionId, now)) {
                 // we don't have a connection to this node right now, make one
-                log.debug("Initialize connection to node {} for sending metadata request",
node.id());
+                log.debug("Initialize connection to node {} for sending metadata request",
node);
                 initiateConnect(node, now);
                 return reconnectBackoffMs;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5c47db1/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 1d810e9..15fdf57 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -71,9 +71,10 @@ public class OffsetFetchRequest extends AbstractRequest {
         @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();
+            String partitionsString = partitions == null ? "<ALL>" : Utils.join(partitions,
",");
             bld.append("(type=OffsetFetchRequest, ").
                     append("groupId=").append(groupId).
-                    append(", partitions=").append(Utils.join(partitions, ",")).
+                    append(", partitions=").append(partitionsString).
                     append(")");
             return bld.toString();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5c47db1/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 467afb3..a3c277f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -554,6 +554,16 @@ public class RequestResponseTest {
         assertEquals(jgr2.rebalanceTimeout(), jgr.rebalanceTimeout());
     }
 
+    @Test
+    public void testOffsetFetchRequestBuilderToString() {
+        String allTopicPartitionsString = OffsetFetchRequest.Builder.allTopicPartitions("someGroup").toString();
+        assertTrue(allTopicPartitionsString.contains("<ALL>"));
+        String string = new OffsetFetchRequest.Builder("group1",
+                singletonList(new TopicPartition("test11", 1))).toString();
+        assertTrue(string.contains("test11"));
+        assertTrue(string.contains("group1"));
+    }
+
     private RequestHeader createRequestHeader() {
         return new RequestHeader((short) 10, (short) 1, "", 10);
     }


Mime
View raw message