kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5980: FailOnInvalidTimestamp does not log error
Date Wed, 04 Oct 2017 22:11:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 713a67fdd -> 5792f2fb3


KAFKA-5980: FailOnInvalidTimestamp does not log error

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Ted Yu <yuzhihong@gmail.com>, Denis
Bolshakov

Closes #3966 from mjsax/kafka-5980-FailOnInvalidTimestamp-does-not-log-error


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5792f2fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5792f2fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5792f2fb

Branch: refs/heads/trunk
Commit: 5792f2fb3db69333bfd22b57b00b42336dc16aa9
Parents: 713a67f
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Oct 4 15:10:59 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Oct 4 15:10:59 2017 -0700

----------------------------------------------------------------------
 .../streams/processor/FailOnInvalidTimestamp.java    | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5792f2fb/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
index e8fc78c..87cb0de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Retrieves embedded metadata timestamps from Kafka messages.
@@ -45,6 +47,7 @@ import org.apache.kafka.streams.errors.StreamsException;
  */
 @InterfaceStability.Evolving
 public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
+    private static final Logger log = LoggerFactory.getLogger(FailOnInvalidTimestamp.class);
 
     /**
      * Raises an exception on every call.
@@ -60,10 +63,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp
{
                                    final long recordTimestamp,
                                    final long previousTimestamp)
             throws StreamsException {
-        throw new StreamsException("Input record " + record + " has invalid (negative) timestamp.
" +
-            "Possibly because a pre-0.10 producer client was used to write this record to
Kafka without embedding a timestamp, " +
-            "or because the input topic was created before upgrading the Kafka cluster to
0.10+. " +
-            "Use a different TimestampExtractor to process this data.");
+
+        final String message = "Input record " + record + " has invalid (negative) timestamp.
" +
+            "Possibly because a pre-0.10 producer client was used to write this record to
Kafka without embedding " +
+            "a timestamp, or because the input topic was created before upgrading the Kafka
cluster to 0.10+. " +
+            "Use a different TimestampExtractor to process this data.";
+
+        log.error(message);
+        throw new StreamsException(message);
     }
 
 }


Mime
View raw message