kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9033; Use consumer/producer identity in generated clientId (#7514)
Date Tue, 15 Oct 2019 22:14:16 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 8f8160b  KAFKA-9033; Use consumer/producer identity in generated clientId (#7514)
8f8160b is described below

commit 8f8160b3cdadaf91ed5a9c7a6780fb59ef46cd66
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Oct 15 14:52:26 2019 -0700

    KAFKA-9033; Use consumer/producer identity in generated clientId (#7514)
    
    By default, if the user does not configure a `client.id`, then we use a very generic identifier,
such as `consumer-15`. It is more useful to include identifying information when available
such as `group.id` for the consumer and `transactional.id` for the producer.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      | 25 ++++++++++++++++------
 .../kafka/clients/producer/KafkaProducer.java      | 17 +++++++++++----
 2 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 7e02341..cacf8d5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -21,6 +21,7 @@ import static org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapt
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
@@ -667,15 +668,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     @SuppressWarnings("unchecked")
     private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V>
valueDeserializer) {
         try {
-            String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
-            if (clientId.isEmpty())
-                clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
-            this.clientId = clientId;
-            this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
-
             GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
-                                                                                 GroupRebalanceConfig.ProtocolType.CONSUMER);
+                    GroupRebalanceConfig.ProtocolType.CONSUMER);
+
+            this.groupId = groupRebalanceConfig.groupId;
+            this.clientId = buildClientId(config.getString(CommonClientConfigs.CLIENT_ID_CONFIG),
groupRebalanceConfig);
+
             LogContext logContext;
+
             // If group.instance.id is set, we will append it to the log context.
             if (groupRebalanceConfig.groupInstanceId.isPresent()) {
                 logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get()
+
@@ -854,6 +854,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         this.groupId = groupId;
     }
 
+    private static String buildClientId(String configuredClientId, GroupRebalanceConfig rebalanceConfig)
{
+        if (!configuredClientId.isEmpty())
+            return configuredClientId;
+
+        if (rebalanceConfig.groupId != null && !rebalanceConfig.groupId.isEmpty())
+            return "consumer-" + rebalanceConfig.groupId + "-" + rebalanceConfig.groupInstanceId.orElseGet(()
->
+                    CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement() + "");
+
+        return "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
+    }
+
     private static Metrics buildMetrics(ConsumerConfig config, Time time, String clientId)
{
         Map<String, String> metricsTags = Collections.singletonMap(CLIENT_ID_METRIC_TAG,
clientId);
         MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3b3589f..4b7f3d2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -329,13 +329,12 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
             this.time = time;
-            String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
-            if (clientId.length() <= 0)
-                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
-            this.clientId = clientId;
 
             String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)
?
                     (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)
: null;
+
+            this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG),
transactionalId);
+
             LogContext logContext;
             if (transactionalId == null)
                 logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
@@ -434,6 +433,16 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         }
     }
 
+    private static String buildClientId(String configuredClientId, String transactionalId)
{
+        if (!configuredClientId.isEmpty())
+            return configuredClientId;
+
+        if (transactionalId != null)
+            return "producer-" + transactionalId;
+
+        return "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
+    }
+
     // visible for testing
     Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata)
{
         int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager
!= null);


Mime
View raw message