kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5762; Refactor AdminClient to use LogContext
Date Wed, 30 Aug 2017 11:28:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk bd54d2e3e -> 10cb534c8


KAFKA-5762; Refactor AdminClient to use LogContext

- client id is part of the log context, so removed ad-hoc usages
- Fixed an issue where the response was not printed correctly,
use `toString(version)` instead of `toString()`
- Capitalized all log statements for consistency
- Fixed a number of double spaces after period

Author: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3741 from Kamal15/kafka-5762


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10cb534c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10cb534c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10cb534c

Branch: refs/heads/trunk
Commit: 10cb534c8b18997301553db6debfe91bf596500e
Parents: bd54d2e
Author: Kamal C <kamal.chandraprakash@gmail.com>
Authored: Wed Aug 30 11:32:40 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Aug 30 12:27:54 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/admin/KafkaAdminClient.java   | 112 +++++++++----------
 1 file changed, 55 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/10cb534c/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index a08aa19..a5f1d5c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -81,9 +81,9 @@ import org.apache.kafka.common.requests.Resource;
 import org.apache.kafka.common.requests.ResourceType;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -111,7 +111,6 @@ import static org.apache.kafka.common.utils.Utils.closeQuietly;
  */
 @InterfaceStability.Evolving
 public class KafkaAdminClient extends AdminClient {
-    private static final Logger log = LoggerFactory.getLogger(KafkaAdminClient.class);
 
     /**
      * The next integer to use to name a KafkaAdminClient which the user hasn't specified
an explicit name for.
@@ -128,6 +127,8 @@ public class KafkaAdminClient extends AdminClient {
      */
     private static final long INVALID_SHUTDOWN_TIME = -1;
 
+    private final Logger log;
+
     /**
      * The default timeout to use for an operation.
      */
@@ -170,7 +171,7 @@ public class KafkaAdminClient extends AdminClient {
 
     /**
      * During a close operation, this is the time at which we will time out all pending operations
-     * and force the RPC thread to exit.  If the admin client is not closing, this will be
0.
+     * and force the RPC thread to exit. If the admin client is not closing, this will be
0.
      */
     private final AtomicLong hardShutdownTimeMs = new AtomicLong(INVALID_SHUTDOWN_TIME);
 
@@ -340,6 +341,8 @@ public class KafkaAdminClient extends AdminClient {
                      Metrics metrics, KafkaClient client, TimeoutProcessorFactory timeoutProcessorFactory)
{
         this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.clientId = clientId;
+        LogContext logContext = new LogContext("[AdminClient clientId=" + clientId + "] ");
+        this.log = logContext.logger(KafkaAdminClient.class);
         this.time = time;
         this.metadata = metadata;
         List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
@@ -355,7 +358,7 @@ public class KafkaAdminClient extends AdminClient {
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
         config.logUnused();
         AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
-        log.debug("Kafka admin client with client id {} created", this.clientId);
+        log.debug("Kafka admin client initialized");
         thread.start();
     }
 
@@ -369,23 +372,23 @@ public class KafkaAdminClient extends AdminClient {
         while (true) {
             if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) {
                 if (prev == INVALID_SHUTDOWN_TIME) {
-                    log.debug("{}: initiating close operation.", clientId);
+                    log.debug("Initiating close operation.");
                 } else {
-                    log.debug("{}: moving hard shutdown time forward.", clientId);
+                    log.debug("Moving hard shutdown time forward.");
                 }
                 client.wakeup(); // Wake the thread, if it is blocked inside poll().
                 break;
             }
             prev = hardShutdownTimeMs.get();
             if (prev < newHardShutdownTimeMs) {
-                log.debug("{}: hard shutdown time is already earlier than requested.", clientId);
+                log.debug("Hard shutdown time is already earlier than requested.");
                 newHardShutdownTimeMs = prev;
                 break;
             }
         }
         if (log.isDebugEnabled()) {
             long deltaMs = Math.max(0, newHardShutdownTimeMs - time.milliseconds());
-            log.debug("{}: waiting for the I/O thread to exit. Hard shutdown in {} ms.",
clientId, deltaMs);
+            log.debug("Waiting for the I/O thread to exit. Hard shutdown in {} ms.", deltaMs);
         }
         try {
             // Wait for the thread to be joined.
@@ -393,9 +396,9 @@ public class KafkaAdminClient extends AdminClient {
 
             AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
             
-            log.debug("{}: closed.", clientId);
+            log.debug("Kafka admin client closed.");
         } catch (InterruptedException e) {
-            log.debug("{}: interrupted while joining I/O thread", clientId, e);
+            log.debug("Interrupted while joining I/O thread", e);
             Thread.currentThread().interrupt();
         }
     }
@@ -457,7 +460,7 @@ public class KafkaAdminClient extends AdminClient {
          * Handle a failure.
          *
          * Depending on what the exception is and how many times we have already tried, we
may choose to
-         * fail the Call, or retry it.  It is important to print the stack traces here in
some cases,
+         * fail the Call, or retry it. It is important to print the stack traces here in
some cases,
          * since they are not necessarily preserved in ApiVersionException objects.
          *
          * @param now           The current time in milliseconds.
@@ -466,7 +469,7 @@ public class KafkaAdminClient extends AdminClient {
         final void fail(long now, Throwable throwable) {
             if (aborted) {
                 // If the call was aborted while in flight due to a timeout, deliver a
-                // TimeoutException.  In this case, we do not get any more retries - the
call has
+                // TimeoutException. In this case, we do not get any more retries - the call
has
                 // failed. We increment tries anyway in order to display an accurate log
message.
                 tries++;
                 if (log.isDebugEnabled()) {
@@ -476,7 +479,7 @@ public class KafkaAdminClient extends AdminClient {
                 handleFailure(new TimeoutException("Aborted due to timeout."));
                 return;
             }
-            // If this is an UnsupportedVersionException that we can retry, do so.  Note
that a
+            // If this is an UnsupportedVersionException that we can retry, do so. Note that
a
             // protocol downgrade will not count against the total number of retries we get
for
             // this RPC. That is why 'tries' is not incremented.
             if ((throwable instanceof UnsupportedVersionException) &&
@@ -514,7 +517,7 @@ public class KafkaAdminClient extends AdminClient {
                 return;
             }
             if (log.isDebugEnabled()) {
-                log.debug("{} failed: {}.  Beginning retry #{}",
+                log.debug("{} failed: {}. Beginning retry #{}",
                     this, prettyPrintException(throwable), tries);
             }
             runnable.enqueue(this, now);
@@ -534,12 +537,11 @@ public class KafkaAdminClient extends AdminClient {
          *
          * @param abstractResponse  The AbstractResponse.
          *
-         * @return                  True if the response has been processed; false to re-submit
the request.
          */
         abstract void handleResponse(AbstractResponse abstractResponse);
 
         /**
-         * Handle a failure.  This will only be called if the failure exception was not
+         * Handle a failure. This will only be called if the failure exception was not
          * retryable, or if we hit a timeout.
          *
          * @param throwable     The exception.
@@ -638,7 +640,7 @@ public class KafkaAdminClient extends AdminClient {
 
     private final class AdminClientRunnable implements Runnable {
         /**
-         * Pending calls.  Protected by the object monitor.
+         * Pending calls. Protected by the object monitor.
          * This will be null only if the thread has shut down.
          */
         private List<Call> newCalls = new LinkedList<>();
@@ -658,15 +660,15 @@ public class KafkaAdminClient extends AdminClient {
             }
             Cluster cluster = metadata.fetch();
             if (cluster.nodes().isEmpty()) {
-                log.trace("{}: metadata is not ready yet.  No cluster nodes found.", clientId);
+                log.trace("Metadata is not ready yet. No cluster nodes found.");
                 return metadata.requestUpdate();
             }
             if (cluster.controller() == null) {
-                log.trace("{}: metadata is not ready yet.  No controller found.", clientId);
+                log.trace("Metadata is not ready yet. No controller found.");
                 return metadata.requestUpdate();
             }
             if (prevMetadataVersion != null) {
-                log.trace("{}: metadata is now ready.", clientId);
+                log.trace("Metadata is now ready.");
             }
             return null;
         }
@@ -680,7 +682,7 @@ public class KafkaAdminClient extends AdminClient {
             int numTimedOut = processor.handleTimeouts(newCalls,
                     "Timed out waiting for a node assignment.");
             if (numTimedOut > 0)
-                log.debug("{}: timed out {} new calls.", clientId, numTimedOut);
+                log.debug("Timed out {} new calls.", numTimedOut);
         }
 
         /**
@@ -696,7 +698,7 @@ public class KafkaAdminClient extends AdminClient {
                     "Timed out waiting to send the call.");
             }
             if (numTimedOut > 0)
-                log.debug("{}: timed out {} call(s) with assigned nodes.", clientId, numTimedOut);
+                log.debug("Timed out {} call(s) with assigned nodes.", numTimedOut);
         }
 
         /**
@@ -708,7 +710,6 @@ public class KafkaAdminClient extends AdminClient {
          * @param now           The current time in milliseconds.
          * @param callsToSend   A map of nodes to the calls they need to handle.
          *
-         * @return              The new calls we need to process.
          */
         private void chooseNodesForNewCalls(long now, Map<Node, List<Call>> callsToSend)
{
             List<Call> newCallsToAdd = null;
@@ -738,7 +739,7 @@ public class KafkaAdminClient extends AdminClient {
                     String.format("Error choosing node for %s: no node found.", call.callName)));
                 return;
             }
-            log.trace("{}: assigned {} to {}", clientId, call, node);
+            log.trace("Assigned {} to {}", call, node);
             getOrCreateListValue(callsToSend, node).add(call);
         }
 
@@ -767,7 +768,7 @@ public class KafkaAdminClient extends AdminClient {
                 if (!client.ready(node, now)) {
                     long nodeTimeout = client.connectionDelay(node, now);
                     pollTimeout = Math.min(pollTimeout, nodeTimeout);
-                    log.trace("{}: client is not ready to send to {}.  Must delay {} ms",
clientId, node, nodeTimeout);
+                    log.trace("Client is not ready to send to {}. Must delay {} ms", node,
nodeTimeout);
                     continue;
                 }
                 Call call = calls.remove(0);
@@ -781,8 +782,7 @@ public class KafkaAdminClient extends AdminClient {
                     continue;
                 }
                 ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder,
now, true);
-                log.trace("{}: sending {} to {}. correlationId={}", clientId, requestBuilder,
node,
-                    clientRequest.correlationId());
+                log.trace("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId());
                 client.send(clientRequest, now);
                 getOrCreateListValue(callsInFlight, node.idString()).add(call);
                 correlationIdToCalls.put(clientRequest.correlationId(), call);
@@ -793,8 +793,8 @@ public class KafkaAdminClient extends AdminClient {
         /**
          * Time out expired calls that are in flight.
          *
-         * Calls that are in flight may have been partially or completely sent over the wire.
 They may
-         * even be in the process of being processed by the remote server.  At the moment,
our only option
+         * Calls that are in flight may have been partially or completely sent over the wire.
They may
+         * even be in the process of being processed by the remote server. At the moment,
our only option
          * to time them out is to close the entire connection.
          *
          * @param processor         The timeout processor.
@@ -807,25 +807,25 @@ public class KafkaAdminClient extends AdminClient {
                 if (contexts.isEmpty())
                     continue;
                 String nodeId = entry.getKey();
-                // We assume that the first element in the list is the earliest.  So it should
be the
+                // We assume that the first element in the list is the earliest. So it should
be the
                 // only one we need to check the timeout for.
                 Call call = contexts.get(0);
                 if (processor.callHasExpired(call)) {
                     if (call.aborted) {
-                        log.warn("{}: aborted call {} is still in callsInFlight.", clientId,
call);
+                        log.warn("Aborted call {} is still in callsInFlight.", call);
                     } else {
-                        log.debug("{}: Closing connection to {} to time out {}", clientId,
nodeId, call);
+                        log.debug("Closing connection to {} to time out {}", nodeId, call);
                         call.aborted = true;
                         client.disconnect(nodeId);
                         numTimedOut++;
-                        // We don't remove anything from the callsInFlight data structure.
 Because the connection
+                        // We don't remove anything from the callsInFlight data structure.
Because the connection
                         // has been closed, the calls should be returned by the next client#poll(),
                         // and handled at that point.
                     }
                 }
             }
             if (numTimedOut > 0)
-                log.debug("{}: timed out {} call(s) in flight.", clientId, numTimedOut);
+                log.debug("Timed out {} call(s) in flight.", numTimedOut);
         }
 
         /**
@@ -844,9 +844,9 @@ public class KafkaAdminClient extends AdminClient {
                 Call call = correlationIdToCall.get(correlationId);
                 if (call == null) {
                     // If the server returns information about a correlation ID we didn't
use yet,
-                    // an internal server error has occurred.  Close the connection and log
an error message.
+                    // an internal server error has occurred. Close the connection and log
an error message.
                     log.error("Internal server error on {}: server returned information about
unknown " +
-                        "correlation ID {}.  requestHeader = {}", response.destination(),
correlationId,
+                        "correlation ID {}, requestHeader = {}", response.destination(),
correlationId,
                         response.requestHeader());
                     client.disconnect(response.destination());
                     continue;
@@ -861,7 +861,7 @@ public class KafkaAdminClient extends AdminClient {
                     continue;
                 }
 
-                // Handle the result of the call.  This may involve retrying the call, if
we got a
+                // Handle the result of the call. This may involve retrying the call, if
we got a
                 // retryible exception.
                 if (response.versionMismatch() != null) {
                     call.fail(now, response.versionMismatch());
@@ -873,10 +873,11 @@ public class KafkaAdminClient extends AdminClient {
                     try {
                         call.handleResponse(response.responseBody());
                         if (log.isTraceEnabled())
-                            log.trace("{}: {} got response {}", clientId, call, response.responseBody());
+                            log.trace("{} got response {}", call,
+                                    response.responseBody().toString(response.requestHeader().apiVersion()));
                     } catch (Throwable t) {
                         if (log.isTraceEnabled())
-                            log.trace("{}: {} handleResponse failed with {}", clientId, call,
prettyPrintException(t));
+                            log.trace("{} handleResponse failed with {}", call, prettyPrintException(t));
                         call.fail(now, t);
                     }
                 }
@@ -886,16 +887,14 @@ public class KafkaAdminClient extends AdminClient {
         private synchronized boolean threadShouldExit(long now, long curHardShutdownTimeMs,
                 Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls)
{
             if (newCalls.isEmpty() && callsToSend.isEmpty() && correlationIdToCalls.isEmpty())
{
-                log.trace("{}: all work has been completed, and the I/O thread is now " +
-                    "exiting.", clientId);
+                log.trace("All work has been completed, and the I/O thread is now exiting.");
                 return true;
             }
             if (now > curHardShutdownTimeMs) {
-                log.info("{}: forcing a hard I/O thread shutdown.  Requests in progress will
" +
-                    "be aborted.", clientId);
+                log.info("Forcing a hard I/O thread shutdown. Requests in progress will be
aborted.");
                 return true;
             }
-            log.debug("{}: hard shutdown in {} ms.", clientId, curHardShutdownTimeMs - now);
+            log.debug("Hard shutdown in {} ms.", curHardShutdownTimeMs - now);
             return false;
         }
 
@@ -922,7 +921,7 @@ public class KafkaAdminClient extends AdminClient {
             Integer prevMetadataVersion = null;
 
             long now = time.milliseconds();
-            log.trace("{} thread starting", clientId);
+            log.trace("Thread starting");
             while (true) {
                 // Check if the AdminClient thread should shut down.
                 long curHardShutdownTimeMs = hardShutdownTimeMs.get();
@@ -950,9 +949,9 @@ public class KafkaAdminClient extends AdminClient {
                 }
 
                 // Wait for network responses.
-                log.trace("{}: entering KafkaClient#poll(timeout={})", clientId, pollTimeout);
+                log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout);
                 List<ClientResponse> responses = client.poll(pollTimeout, now);
-                log.trace("{}: KafkaClient#poll retrieved {} response(s)", clientId, responses.size());
+                log.trace("KafkaClient#poll retrieved {} response(s)", responses.size());
 
                 // Update the current time and handle the latest responses.
                 now = time.milliseconds();
@@ -968,18 +967,18 @@ public class KafkaAdminClient extends AdminClient {
             numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
                     "The AdminClient thread has exited.");
             if (numTimedOut > 0) {
-                log.debug("{}: timed out {} remaining operations.", clientId, numTimedOut);
+                log.debug("Timed out {} remaining operations.", numTimedOut);
             }
             closeQuietly(client, "KafkaClient");
             closeQuietly(metrics, "Metrics");
-            log.debug("{}: exiting AdminClientRunnable thread.", clientId);
+            log.debug("Exiting AdminClientRunnable thread.");
         }
 
         /**
          * Queue a call for sending.
          *
-         * If the AdminClient thread has exited, this will fail.  Otherwise, it will succeed
(even
-         * if the AdminClient is shutting down.)  This function should called when retrying
an
+         * If the AdminClient thread has exited, this will fail. Otherwise, it will succeed
(even
+         * if the AdminClient is shutting down). This function should called when retrying
an
          * existing call.
          *
          * @param call      The new call object.
@@ -987,8 +986,7 @@ public class KafkaAdminClient extends AdminClient {
          */
         void enqueue(Call call, long now) {
             if (log.isDebugEnabled()) {
-                log.debug("{}: queueing {} with a timeout {} ms from now.",
-                    clientId, call, call.deadlineMs - now);
+                log.debug("Queueing {} with a timeout {} ms from now.", call, call.deadlineMs
- now);
             }
             boolean accepted = false;
             synchronized (this) {
@@ -1000,7 +998,7 @@ public class KafkaAdminClient extends AdminClient {
             if (accepted) {
                 client.wakeup(); // wake the thread if it is in poll()
             } else {
-                log.debug("{}: the AdminClient thread has exited.  Timing out {}.", clientId,
call);
+                log.debug("The AdminClient thread has exited. Timing out {}.", call);
                 call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread has
exited."));
             }
         }
@@ -1015,7 +1013,7 @@ public class KafkaAdminClient extends AdminClient {
          */
         void call(Call call, long now) {
             if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
-                log.debug("{}: the AdminClient is not accepting new calls.  Timing out {}.",
clientId, call);
+                log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
                 call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is
not accepting new calls."));
             } else {
                 enqueue(call, now);
@@ -1060,7 +1058,7 @@ public class KafkaAdminClient extends AdminClient {
                         }
                     }
                 }
-                // The server should send back a response for every topic.  But do a sanity
check anyway.
+                // The server should send back a response for every topic. But do a sanity
check anyway.
                 for (Map.Entry<String, KafkaFutureImpl<Void>> entry : topicFutures.entrySet())
{
                     KafkaFutureImpl<Void> future = entry.getValue();
                     if (!future.isDone()) {
@@ -1113,7 +1111,7 @@ public class KafkaAdminClient extends AdminClient {
                         }
                     }
                 }
-                // The server should send back a response for every topic.  But do a sanity
check anyway.
+                // The server should send back a response for every topic. But do a sanity
check anyway.
                 for (Map.Entry<String, KafkaFutureImpl<Void>> entry : topicFutures.entrySet())
{
                     KafkaFutureImpl<Void> future = entry.getValue();
                     if (!future.isDone()) {


Mime
View raw message