kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix logged timeout in KafkaProducer.close() (#4623)
Date Sat, 21 Jul 2018 10:14:05 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee0c2ce  MINOR: Fix logged timeout in KafkaProducer.close() (#4623)
ee0c2ce is described below

commit ee0c2cee214b3d46f83562467bb1e4f5165f172c
Author: wangshao <wangshaono1@gmail.com>
AuthorDate: Sat Jul 21 18:13:58 2018 +0800

    MINOR: Fix logged timeout in KafkaProducer.close() (#4623)
    
    The log line says `ms`, but the actual value could represent a
    different time unit depending on what the user provided.
    
    Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
---
 .../org/apache/kafka/clients/producer/KafkaProducer.java    | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3a6717b..cb52941 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1078,21 +1078,24 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         if (timeout < 0)
             throw new IllegalArgumentException("The timeout cannot be negative.");
 
-        log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
+        long timeoutMs = timeUnit.toMillis(timeout);
+        log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);
+
         // this will keep track of the first encountered exception
         AtomicReference<Throwable> firstException = new AtomicReference<>();
         boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
-        if (timeout > 0) {
+        if (timeoutMs > 0) {
             if (invokedFromCallback) {
                 log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless
blocking due to self-join. " +
-                        "This means you have incorrectly invoked close with a non-zero timeout
from the producer call-back.", timeout);
+                        "This means you have incorrectly invoked close with a non-zero timeout
from the producer call-back.",
+                        timeoutMs);
             } else {
                 // Try to close gracefully.
                 if (this.sender != null)
                     this.sender.initiateClose();
                 if (this.ioThread != null) {
                     try {
-                        this.ioThread.join(timeUnit.toMillis(timeout));
+                        this.ioThread.join(timeoutMs);
                     } catch (InterruptedException t) {
                         firstException.compareAndSet(null, new InterruptException(t));
                         log.error("Interrupted while joining ioThread", t);
@@ -1103,7 +1106,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
 
         if (this.sender != null && this.ioThread != null && this.ioThread.isAlive())
{
             log.info("Proceeding to force close the producer since pending requests could
not be completed " +
-                    "within timeout {} ms.", timeout);
+                    "within timeout {} ms.", timeoutMs);
             this.sender.forceClose();
             // Only join the sender thread when not calling from callback.
             if (!invokedFromCallback) {


Mime
View raw message