kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Log exception thrown by consumer.poll() in VerifiableConsumer (#6368)
Date Wed, 06 Mar 2019 02:12:59 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

The following commit(s) were added to refs/heads/trunk by this push:
     new 5e16173  MINOR: Log exception thrown by consumer.poll() in VerifiableConsumer (#6368)
5e16173 is described below

commit 5e161736597e0430dd276ac9007d8559733af3e2
Author: Bob Barrett <bob.barrett@outlook.com>
AuthorDate: Tue Mar 5 21:12:47 2019 -0500

    MINOR: Log exception thrown by consumer.poll() in VerifiableConsumer (#6368)
    SecurityTest.test_client_ssl_endpoint_validation_failure is failing because it greps for
'SSLHandshakeException in the consumer and producer log files. With the fix for KAKFA-7773,
the test uses the VerifiableConsumer instead of the ConsoleConsumer, which does not log the
exception stack trace to the service log. This patch catches exceptions in the VerifiableConsumer
and logs them in order to fix the test. Tested by running the test locally.
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
 .../src/main/java/org/apache/kafka/tools/VerifiableConsumer.java  | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 1297841..43d30d0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -43,6 +43,8 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
@@ -82,6 +84,8 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 public class VerifiableConsumer implements Closeable, OffsetCommitCallback, ConsumerRebalanceListener
+    private static final Logger log = LoggerFactory.getLogger(VerifiableConsumer.class);
     private final ObjectMapper mapper = new ObjectMapper();
     private final PrintStream out;
     private final KafkaConsumer<String, String> consumer;
@@ -233,6 +237,10 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback,
         } catch (WakeupException e) {
             // ignore, we are closing
+            log.trace("Caught WakeupException because consumer is shutdown, ignore and terminate.",
+        } catch (Throwable t) {
+            // Log the error so it goes to the service log and not stdout
+            log.error("Error during processing, terminating consumer process: ", t);
         } finally {
             printJson(new ShutdownComplete());

View raw message