kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Adjust Streams parameter hint on TimeoutException (#6280)
Date Wed, 03 Jul 2019 04:14:19 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 298a9bc  MINOR: Adjust Streams parameter hint on TimeoutException (#6280)
298a9bc is described below

commit 298a9bc397878ddc755b91c44aef0c928be6eaf9
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Tue Jul 2 21:14:06 2019 -0700

    MINOR: Adjust Streams parameter hint on TimeoutException (#6280)
    
    KIP-91 was included in Kafka 2.1.0, so we should mention
    `delivery.timeout.ms` in the hint as it's the config that
    users would want to change in most cases.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>,
Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/streams/processor/internals/RecordCollectorImpl.java      | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 4edbc3d..c6bcda6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -59,7 +59,9 @@ public class RecordCollectorImpl implements RecordCollector {
         "No more records will be sent and no more offsets will be recorded for this task.
" +
         "Enable TRACE logging to view failed record key and value.";
     private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous
record (timestamp %d) to topic %s due to %s";
-    private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries`
and `retry.backoff.ms` to avoid this error.";
+    private final static String PARAMETER_HINT = "\nYou can increase the producer configs
`delivery.timeout.ms` and/or " +
+        "`retries` to avoid this error. Note that `retries` is set to infinite by default.";
+
     private volatile KafkaException sendException;
 
     public RecordCollectorImpl(final String streamTaskId,
@@ -125,6 +127,8 @@ public class RecordCollectorImpl implements RecordCollector {
     ) {
         String errorLogMessage = LOG_MESSAGE;
         String errorMessage = EXCEPTION_MESSAGE;
+        // There is no documented API for detecting retriable errors, so we rely on `RetriableException`
+        // even though it's an implementation detail (i.e. we do the best we can given what's
available)
         if (exception instanceof RetriableException) {
             errorLogMessage += PARAMETER_HINT;
             errorMessage += PARAMETER_HINT;


Mime
View raw message