kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: AdminClient should register with `AppInfoParser`
Date Sun, 13 Aug 2017 23:12:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 2a4eeb1c6 -> f68d67201


MINOR: AdminClient should register with `AppInfoParser`

Also make "created" message more consistent across clients.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #3658 from ijuma/admin-client-should-register-with-app-info-parser

(cherry picked from commit f4dc5ac15ca120d8240ac3b8ba28139ccb26cf91)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f68d6720
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f68d6720
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f68d6720

Branch: refs/heads/0.11.0
Commit: f68d6720135c9e305c2e6116e1e1b3d45d6acc59
Parents: 2a4eeb1
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon Aug 14 00:11:58 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon Aug 14 00:12:12 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 7 ++++++-
 .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 4 ++--
 .../java/org/apache/kafka/clients/producer/KafkaProducer.java | 4 ++--
 3 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f68d6720/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ed32bff..87ede00 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -79,6 +79,7 @@ import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.Resource;
 import org.apache.kafka.common.requests.ResourceType;
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
@@ -353,7 +354,8 @@ public class KafkaAdminClient extends AdminClient {
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
         config.logUnused();
-        log.debug("Created Kafka admin client {}", this.clientId);
+        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+        log.debug("Kafka admin client with client id {} created", this.clientId);
         thread.start();
     }
 
@@ -388,6 +390,9 @@ public class KafkaAdminClient extends AdminClient {
         try {
             // Wait for the thread to be joined.
             thread.join();
+
+            AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+            
             log.debug("{}: closed.", clientId);
         } catch (InterruptedException e) {
             log.debug("{}: interrupted while joining I/O thread", clientId, e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f68d6720/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 155f2e0..3e60488 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -756,7 +756,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
 
-            log.debug("Kafka consumer created");
+            log.debug("Kafka consumer with client id {} created", clientId);
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed
             // this is to prevent resource leak. see KAFKA-2121
@@ -1614,7 +1614,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
         ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
-        log.debug("The Kafka consumer has closed.");
+        log.debug("Kafka consumer with client id {} has been closed", clientId);
         Throwable exception = firstException.get();
         if (exception != null && !swallowException) {
             if (exception instanceof InterruptException) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f68d6720/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1476aca..19e2ca4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -407,7 +407,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             this.errors = this.metrics.sensor("errors");
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
-            log.debug("Kafka producer started");
+            log.debug("Kafka producer with client id {} created", clientId);
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed this is to
prevent resource leak. see KAFKA-2121
             close(0, TimeUnit.MILLISECONDS, true);
@@ -1012,7 +1012,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
         ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException);
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
-        log.debug("The Kafka producer has closed.");
+        log.debug("Kafka producer with client id {} has been closed", clientId);
         if (firstException.get() != null && !swallowException)
             throw new KafkaException("Failed to close kafka producer", firstException.get());
     }


Mime
View raw message