kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7510: preventing data being leaked to logs by default (#5834)
Date Fri, 04 Jan 2019 12:02:07 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f8113c0  KAFKA-7510: preventing data being leaked to logs by default (#5834)
f8113c0 is described below

commit f8113c053b1266916ae3ce73385d4ed336ab2730
Author: forficate <260708+forficate@users.noreply.github.com>
AuthorDate: Fri Jan 4 23:01:57 2019 +1100

    KAFKA-7510: preventing data being leaked to logs by default (#5834)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>,
Bill Bejeck <bill@confluent.io>
---
 .../processor/internals/RecordCollectorImpl.java   | 28 ++++++++++++++++------
 .../processor/internals/RecordCollectorTest.java   |  2 +-
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index d3a0030..757a5fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -55,8 +55,9 @@ public class RecordCollectorImpl implements RecordCollector {
     private final Map<TopicPartition, Long> offsets;
     private final ProductionExceptionHandler productionExceptionHandler;
 
-    private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp
{}) to topic {} due to {}; " +
-        "No more records will be sent and no more offsets will be recorded for this task.";
+    private final static String LOG_MESSAGE = "Error sending record to topic {} due to {};
" +
+        "No more records will be sent and no more offsets will be recorded for this task.
" +
+        "Enable TRACE logging to view failed record key and value.";
     private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous
record (key %s value %s timestamp %d) to topic %s due to %s";
     private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries`
and `retry.backoff.ms` to avoid this error.";
     private volatile KafkaException sendException;
@@ -128,7 +129,11 @@ public class RecordCollectorImpl implements RecordCollector {
             errorLogMessage += PARAMETER_HINT;
             errorMessage += PARAMETER_HINT;
         }
-        log.error(errorLogMessage, key, value, timestamp, topic, exception.toString());
+        log.error(errorLogMessage, topic, exception.getMessage(), exception);
+
+        // KAFKA-7510 put message key and value in TRACE level log so we don't leak data
by default
+        log.trace("Failed message: key {} value {} timestamp {}", key, value, timestamp);
+
         sendException = new StreamsException(
             String.format(
                 errorMessage,
@@ -172,7 +177,11 @@ public class RecordCollectorImpl implements RecordCollector {
                     } else {
                         if (sendException == null) {
                             if (exception instanceof ProducerFencedException) {
-                                log.warn(LOG_MESSAGE, key, value, timestamp, topic, exception.toString());
+                                log.warn(LOG_MESSAGE, topic, exception.getMessage(), exception);
+
+                                // KAFKA-7510 put message key and value in TRACE level log
so we don't leak data by default
+                                log.trace("Failed message: (key {} value {} timestamp {})
topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
+
                                 sendException = new ProducerFencedException(
                                     String.format(
                                         EXCEPTION_MESSAGE,
@@ -192,10 +201,15 @@ public class RecordCollectorImpl implements RecordCollector {
                                     recordSendError(key, value, timestamp, topic, exception);
                                 } else {
                                     log.warn(
-                                        "Error sending records (key=[{}] value=[{}] timestamp=[{}])
to topic=[{}] and partition=[{}]; " +
-                                            "The exception handler chose to CONTINUE processing
in spite of this error.",
-                                        key, value, timestamp, topic, partition, exception
+                                        "Error sending records topic=[{}] and partition=[{}];
" +
+                                            "The exception handler chose to CONTINUE processing
in spite of this error. " +
+                                            "Enable TRACE logging to view failed messages
key and value.",
+                                        topic, partition, exception
                                     );
+
+                                    // KAFKA-7510 put message key and value in TRACE level
log so we don't leak data by default
+                                    log.trace("Failed message: (key {} value {} timestamp
{}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
+
                                     skippedRecordsSensor.record();
                                 }
                             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 0bc65cc..a7da2cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -230,7 +230,7 @@ public class RecordCollectorTest {
         });
         collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer,
streamPartitioner);
         assertEquals(1.0, metrics.metrics().get(metricName).metricValue());
-        assertTrue(logCaptureAppender.getMessages().contains("test Error sending records
(key=[3] value=[0] timestamp=[null]) to topic=[topic1] and partition=[0]; The exception handler
chose to CONTINUE processing in spite of this error."));
+        assertTrue(logCaptureAppender.getMessages().contains("test Error sending records
topic=[topic1] and partition=[0]; The exception handler chose to CONTINUE processing in spite
of this error. Enable TRACE logging to view failed messages key and value."));
         LogCaptureAppender.unregister(logCaptureAppender);
     }
 


Mime
View raw message