kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Add logging when commitSync fails in StreamTask
Date Wed, 08 Feb 2017 21:01:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e7c869e65 -> c7c113af6


MINOR: Add logging when commitSync fails in StreamTask

When `consumer.commitSync` fails in `StreamTask`, the `CommitFailedException` bubbles up to
[here](https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L780)
and swallowed.  It'd be great if we knew which offsets failed to commit so that we may rewind
our consumer.

Author: J$ <jmonette@homeaway.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #2514 from jmoney8080/jsm.addCommitLogLogging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c7c113af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c7c113af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c7c113af

Branch: refs/heads/trunk
Commit: c7c113af63d8b4c49f16297af22b3a02528a500c
Parents: e7c869e
Author: Jonathan Monette <jmonette@homeaway.com>
Authored: Wed Feb 8 12:58:46 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Feb 8 13:00:49 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/processor/internals/StreamTask.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c7c113af/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e0dc2dc..be77856 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -294,7 +295,12 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
                 stateMgr.putOffsetLimit(partition, offset);
             }
-            consumer.commitSync(consumedOffsetsAndMetadata);
+            try {
+                consumer.commitSync(consumedOffsetsAndMetadata);
+            } catch (final CommitFailedException cfe) {
+                log.warn("{} Failed offset commits: {} ", logPrefix, consumedOffsetsAndMetadata);
+                throw cfe;
+            }
             commitOffsetNeeded = false;
         }
 


Mime
View raw message